mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-18 06:30:07 +08:00
Merge pull request #1087 from joeslay/master
Storing files in in-memory collections on windows
This commit is contained in:
commit
c262526d8a
4
go.mod
4
go.mod
@ -1,7 +1,7 @@
|
||||
module github.com/chrislusf/seaweedfs
|
||||
|
||||
go 1.12
|
||||
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.44.3
|
||||
contrib.go.opencensus.io/exporter/aws v0.0.0-20190807220307-c50fb1bd7f21 // indirect
|
||||
@ -119,4 +119,4 @@ require (
|
||||
pack.ag/amqp v0.12.1 // indirect
|
||||
)
|
||||
|
||||
replace github.com/satori/go.uuid v1.2.0 => github.com/satori/go.uuid v0.0.0-20181028125025-b2ce2384e17b
|
||||
replace github.com/satori/go.uuid v1.2.0 => github.com/satori/go.uuid v0.0.0-20181028125025-b2ce2384e17b
|
3
go.sum
3
go.sum
@ -98,6 +98,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/chrislusf/raft v0.0.0-20190225081310-10d6e2182d92 h1:lM9SFsh0EPXkyJyrTJqLZPAIJBtNFP6LNkYXu2MnSZI=
|
||||
github.com/chrislusf/raft v0.0.0-20190225081310-10d6e2182d92/go.mod h1:4jyiUCD5y548+yKW+oiHtccBiMaLCCbFBpK2t7X4eUo=
|
||||
github.com/chrislusf/seaweedfs v0.0.0-20190912032620-ae53f636804e h1:PmqW1XGq0V6KnwOFa3hOSqsqa/bH66zxWzCVMOo5Yi4=
|
||||
github.com/chrislusf/seaweedfs v0.0.0-20190912032620-ae53f636804e/go.mod h1:e5Pz27e2DxLCFt6GbCBP5/qJygD4TkOL5xqSFYFq+2U=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
|
||||
github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s=
|
||||
@ -262,6 +264,7 @@ github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/U
|
||||
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
||||
github.com/joeslay/seaweedfs v0.0.0-20190912104409-d8c34b032fb6/go.mod h1:ljVry+CyFSNBLlKiell2UlxOKCvXXHjyBhiGDzXa+0c=
|
||||
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
|
||||
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
|
||||
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
|
||||
|
@ -112,7 +112,7 @@ func runBackup(cmd *Command, args []string) bool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0)
|
||||
v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
|
||||
if err != nil {
|
||||
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
|
||||
return true
|
||||
@ -137,7 +137,7 @@ func runBackup(cmd *Command, args []string) bool {
|
||||
// remove the old data
|
||||
v.Destroy()
|
||||
// recreate an empty volume
|
||||
v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0)
|
||||
v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
|
||||
if err != nil {
|
||||
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
|
||||
return true
|
||||
|
@ -38,7 +38,7 @@ func runCompact(cmd *Command, args []string) bool {
|
||||
|
||||
vid := needle.VolumeId(*compactVolumeId)
|
||||
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
|
||||
storage.NeedleMapInMemory, nil, nil, preallocate)
|
||||
storage.NeedleMapInMemory, nil, nil, preallocate, 0)
|
||||
if err != nil {
|
||||
glog.Fatalf("Load Volume [ERROR] %s\n", err)
|
||||
}
|
||||
|
168
weed/os_overloads/file_windows.go
Normal file
168
weed/os_overloads/file_windows.go
Normal file
@ -0,0 +1,168 @@
|
||||
// Copyright 2009 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package os_overloads
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
||||
func isAbs(path string) (b bool) {
|
||||
v := volumeName(path)
|
||||
if v == "" {
|
||||
return false
|
||||
}
|
||||
path = path[len(v):]
|
||||
if path == "" {
|
||||
return false
|
||||
}
|
||||
return os.IsPathSeparator(path[0])
|
||||
}
|
||||
|
||||
func volumeName(path string) (v string) {
|
||||
if len(path) < 2 {
|
||||
return ""
|
||||
}
|
||||
// with drive letter
|
||||
c := path[0]
|
||||
if path[1] == ':' &&
|
||||
('0' <= c && c <= '9' || 'a' <= c && c <= 'z' ||
|
||||
'A' <= c && c <= 'Z') {
|
||||
return path[:2]
|
||||
}
|
||||
// is it UNC
|
||||
if l := len(path); l >= 5 && os.IsPathSeparator(path[0]) && os.IsPathSeparator(path[1]) &&
|
||||
!os.IsPathSeparator(path[2]) && path[2] != '.' {
|
||||
// first, leading `\\` and next shouldn't be `\`. its server name.
|
||||
for n := 3; n < l-1; n++ {
|
||||
// second, next '\' shouldn't be repeated.
|
||||
if os.IsPathSeparator(path[n]) {
|
||||
n++
|
||||
// third, following something characters. its share name.
|
||||
if !os.IsPathSeparator(path[n]) {
|
||||
if path[n] == '.' {
|
||||
break
|
||||
}
|
||||
for ; n < l; n++ {
|
||||
if os.IsPathSeparator(path[n]) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return path[:n]
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// fixLongPath returns the extended-length (\\?\-prefixed) form of
|
||||
// path when needed, in order to avoid the default 260 character file
|
||||
// path limit imposed by Windows. If path is not easily converted to
|
||||
// the extended-length form (for example, if path is a relative path
|
||||
// or contains .. elements), or is short enough, fixLongPath returns
|
||||
// path unmodified.
|
||||
//
|
||||
// See https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
|
||||
func fixLongPath(path string) string {
|
||||
// Do nothing (and don't allocate) if the path is "short".
|
||||
// Empirically (at least on the Windows Server 2013 builder),
|
||||
// the kernel is arbitrarily okay with < 248 bytes. That
|
||||
// matches what the docs above say:
|
||||
// "When using an API to create a directory, the specified
|
||||
// path cannot be so long that you cannot append an 8.3 file
|
||||
// name (that is, the directory name cannot exceed MAX_PATH
|
||||
// minus 12)." Since MAX_PATH is 260, 260 - 12 = 248.
|
||||
//
|
||||
// The MSDN docs appear to say that a normal path that is 248 bytes long
|
||||
// will work; empirically the path must be less then 248 bytes long.
|
||||
if len(path) < 248 {
|
||||
// Don't fix. (This is how Go 1.7 and earlier worked,
|
||||
// not automatically generating the \\?\ form)
|
||||
return path
|
||||
}
|
||||
|
||||
// The extended form begins with \\?\, as in
|
||||
// \\?\c:\windows\foo.txt or \\?\UNC\server\share\foo.txt.
|
||||
// The extended form disables evaluation of . and .. path
|
||||
// elements and disables the interpretation of / as equivalent
|
||||
// to \. The conversion here rewrites / to \ and elides
|
||||
// . elements as well as trailing or duplicate separators. For
|
||||
// simplicity it avoids the conversion entirely for relative
|
||||
// paths or paths containing .. elements. For now,
|
||||
// \\server\share paths are not converted to
|
||||
// \\?\UNC\server\share paths because the rules for doing so
|
||||
// are less well-specified.
|
||||
if len(path) >= 2 && path[:2] == `\\` {
|
||||
// Don't canonicalize UNC paths.
|
||||
return path
|
||||
}
|
||||
if !isAbs(path) {
|
||||
// Relative path
|
||||
return path
|
||||
}
|
||||
|
||||
const prefix = `\\?`
|
||||
|
||||
pathbuf := make([]byte, len(prefix)+len(path)+len(`\`))
|
||||
copy(pathbuf, prefix)
|
||||
n := len(path)
|
||||
r, w := 0, len(prefix)
|
||||
for r < n {
|
||||
switch {
|
||||
case os.IsPathSeparator(path[r]):
|
||||
// empty block
|
||||
r++
|
||||
case path[r] == '.' && (r+1 == n || os.IsPathSeparator(path[r+1])):
|
||||
// /./
|
||||
r++
|
||||
case r+1 < n && path[r] == '.' && path[r+1] == '.' && (r+2 == n || os.IsPathSeparator(path[r+2])):
|
||||
// /../ is currently unhandled
|
||||
return path
|
||||
default:
|
||||
pathbuf[w] = '\\'
|
||||
w++
|
||||
for ; r < n && !os.IsPathSeparator(path[r]); r++ {
|
||||
pathbuf[w] = path[r]
|
||||
w++
|
||||
}
|
||||
}
|
||||
}
|
||||
// A drive's root directory needs a trailing \
|
||||
if w == len(`\\?\c:`) {
|
||||
pathbuf[w] = '\\'
|
||||
w++
|
||||
}
|
||||
return string(pathbuf[:w])
|
||||
}
|
||||
|
||||
// syscallMode returns the syscall-specific mode bits from Go's portable mode bits.
|
||||
func syscallMode(i os.FileMode) (o uint32) {
|
||||
o |= uint32(i.Perm())
|
||||
if i&os.ModeSetuid != 0 {
|
||||
o |= syscall.S_ISUID
|
||||
}
|
||||
if i&os.ModeSetgid != 0 {
|
||||
o |= syscall.S_ISGID
|
||||
}
|
||||
if i&os.ModeSticky != 0 {
|
||||
o |= syscall.S_ISVTX
|
||||
}
|
||||
// No mapping for Go's ModeTemporary (plan9 only).
|
||||
return
|
||||
}
|
||||
|
||||
//If the bool is set to true then the file is opened with the parameters FILE_ATTRIBUTE_TEMPORARY and
|
||||
// FILE_FLAG_DELETE_ON_CLOSE
|
||||
func OpenFile(name string, flag int, perm os.FileMode, setToTempAndDelete bool) (file *os.File, err error) {
|
||||
r, e := Open(fixLongPath(name), flag|windows.O_CLOEXEC, syscallMode(perm), setToTempAndDelete)
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
return os.NewFile(uintptr(r), name), nil
|
||||
}
|
80
weed/os_overloads/syscall_windows.go
Normal file
80
weed/os_overloads/syscall_windows.go
Normal file
@ -0,0 +1,80 @@
|
||||
// Copyright 2009 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Windows system calls.
|
||||
|
||||
package os_overloads
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
"unsafe"
|
||||
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
||||
// windows api calls
|
||||
|
||||
//sys CreateFile(name *uint16, access uint32, mode uint32, sa *SecurityAttributes, createmode uint32, attrs uint32, templatefile int32) (handle Handle, err error) [failretval==InvalidHandle] = CreateFileW
|
||||
|
||||
func makeInheritSa() *syscall.SecurityAttributes {
|
||||
var sa syscall.SecurityAttributes
|
||||
sa.Length = uint32(unsafe.Sizeof(sa))
|
||||
sa.InheritHandle = 1
|
||||
return &sa
|
||||
}
|
||||
|
||||
// opens the
|
||||
func Open(path string, mode int, perm uint32, setToTempAndDelete bool) (fd syscall.Handle, err error) {
|
||||
if len(path) == 0 {
|
||||
return syscall.InvalidHandle, windows.ERROR_FILE_NOT_FOUND
|
||||
}
|
||||
pathp, err := syscall.UTF16PtrFromString(path)
|
||||
if err != nil {
|
||||
return syscall.InvalidHandle, err
|
||||
}
|
||||
var access uint32
|
||||
switch mode & (windows.O_RDONLY | windows.O_WRONLY | windows.O_RDWR) {
|
||||
case windows.O_RDONLY:
|
||||
access = windows.GENERIC_READ
|
||||
case windows.O_WRONLY:
|
||||
access = windows.GENERIC_WRITE
|
||||
case windows.O_RDWR:
|
||||
access = windows.GENERIC_READ | windows.GENERIC_WRITE
|
||||
}
|
||||
if mode&windows.O_CREAT != 0 {
|
||||
access |= windows.GENERIC_WRITE
|
||||
}
|
||||
if mode&windows.O_APPEND != 0 {
|
||||
access &^= windows.GENERIC_WRITE
|
||||
access |= windows.FILE_APPEND_DATA
|
||||
}
|
||||
sharemode := uint32(windows.FILE_SHARE_READ | windows.FILE_SHARE_WRITE)
|
||||
var sa *syscall.SecurityAttributes
|
||||
if mode&windows.O_CLOEXEC == 0 {
|
||||
sa = makeInheritSa()
|
||||
}
|
||||
var createmode uint32
|
||||
switch {
|
||||
case mode&(windows.O_CREAT|windows.O_EXCL) == (windows.O_CREAT | windows.O_EXCL):
|
||||
createmode = windows.CREATE_NEW
|
||||
case mode&(windows.O_CREAT|windows.O_TRUNC) == (windows.O_CREAT | windows.O_TRUNC):
|
||||
createmode = windows.CREATE_ALWAYS
|
||||
case mode&windows.O_CREAT == windows.O_CREAT:
|
||||
createmode = windows.OPEN_ALWAYS
|
||||
case mode&windows.O_TRUNC == windows.O_TRUNC:
|
||||
createmode = windows.TRUNCATE_EXISTING
|
||||
default:
|
||||
createmode = windows.OPEN_EXISTING
|
||||
}
|
||||
|
||||
var h syscall.Handle
|
||||
var e error
|
||||
|
||||
if setToTempAndDelete {
|
||||
h, e = syscall.CreateFile(pathp, access, sharemode, sa, createmode, (windows.FILE_ATTRIBUTE_TEMPORARY | FILE_FLAG_DELETE_ON_CLOSE), 0)
|
||||
} else {
|
||||
h, e = syscall.CreateFile(pathp, access, sharemode, sa, createmode, windows.FILE_ATTRIBUTE_NORMAL, 0)
|
||||
}
|
||||
return h, e
|
||||
}
|
9
weed/os_overloads/types_windows.go
Normal file
9
weed/os_overloads/types_windows.go
Normal file
@ -0,0 +1,9 @@
|
||||
// Copyright 2011 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package os_overloads
|
||||
|
||||
const (
|
||||
FILE_FLAG_DELETE_ON_CLOSE = 0x04000000
|
||||
)
|
@ -139,6 +139,7 @@ message AssignRequest {
|
||||
string data_center = 5;
|
||||
string rack = 6;
|
||||
string data_node = 7;
|
||||
uint32 MemoryMapMaxSizeMB = 8;
|
||||
}
|
||||
message AssignResponse {
|
||||
string fid = 1;
|
||||
|
@ -44,12 +44,15 @@ It has these top-level messages:
|
||||
*/
|
||||
package master_pb
|
||||
|
||||
import proto "github.com/golang/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
import (
|
||||
fmt "fmt"
|
||||
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
|
||||
math "math"
|
||||
|
||||
context "golang.org/x/net/context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
)
|
||||
|
||||
@ -648,13 +651,14 @@ func (m *Location) GetPublicUrl() string {
|
||||
}
|
||||
|
||||
type AssignRequest struct {
|
||||
Count uint64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"`
|
||||
Replication string `protobuf:"bytes,2,opt,name=replication" json:"replication,omitempty"`
|
||||
Collection string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
|
||||
Ttl string `protobuf:"bytes,4,opt,name=ttl" json:"ttl,omitempty"`
|
||||
DataCenter string `protobuf:"bytes,5,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"`
|
||||
Rack string `protobuf:"bytes,6,opt,name=rack" json:"rack,omitempty"`
|
||||
DataNode string `protobuf:"bytes,7,opt,name=data_node,json=dataNode" json:"data_node,omitempty"`
|
||||
Count uint64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"`
|
||||
Replication string `protobuf:"bytes,2,opt,name=replication" json:"replication,omitempty"`
|
||||
Collection string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
|
||||
Ttl string `protobuf:"bytes,4,opt,name=ttl" json:"ttl,omitempty"`
|
||||
DataCenter string `protobuf:"bytes,5,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"`
|
||||
Rack string `protobuf:"bytes,6,opt,name=rack" json:"rack,omitempty"`
|
||||
DataNode string `protobuf:"bytes,7,opt,name=data_node,json=dataNode" json:"data_node,omitempty"`
|
||||
MemoryMapMaxSizeMB uint32 `protobuf:"varint,8,opt,name=memorymapmaxsizemb" json:"memorymapmaxsizemb,omitempty"`
|
||||
}
|
||||
|
||||
func (m *AssignRequest) Reset() { *m = AssignRequest{} }
|
||||
@ -711,6 +715,13 @@ func (m *AssignRequest) GetDataNode() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *AssignRequest) GetMemoryMapMaxSizeMB() uint32 {
|
||||
if m != nil {
|
||||
return m.MemoryMapMaxSizeMB
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type AssignResponse struct {
|
||||
Fid string `protobuf:"bytes,1,opt,name=fid" json:"fid,omitempty"`
|
||||
Url string `protobuf:"bytes,2,opt,name=url" json:"url,omitempty"`
|
||||
|
@ -131,6 +131,7 @@ message AllocateVolumeRequest {
|
||||
int64 preallocate = 3;
|
||||
string replication = 4;
|
||||
string ttl = 5;
|
||||
int32 memorymapmaxsizemb = 6;
|
||||
}
|
||||
message AllocateVolumeResponse {
|
||||
}
|
||||
|
@ -70,12 +70,15 @@ It has these top-level messages:
|
||||
*/
|
||||
package volume_server_pb
|
||||
|
||||
import proto "github.com/golang/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
import (
|
||||
fmt "fmt"
|
||||
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
|
||||
math "math"
|
||||
|
||||
context "golang.org/x/net/context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
)
|
||||
|
||||
@ -315,11 +318,12 @@ func (*DeleteCollectionResponse) ProtoMessage() {}
|
||||
func (*DeleteCollectionResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} }
|
||||
|
||||
type AllocateVolumeRequest struct {
|
||||
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
|
||||
Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"`
|
||||
Preallocate int64 `protobuf:"varint,3,opt,name=preallocate" json:"preallocate,omitempty"`
|
||||
Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"`
|
||||
Ttl string `protobuf:"bytes,5,opt,name=ttl" json:"ttl,omitempty"`
|
||||
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
|
||||
Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"`
|
||||
Preallocate int64 `protobuf:"varint,3,opt,name=preallocate" json:"preallocate,omitempty"`
|
||||
Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"`
|
||||
Ttl string `protobuf:"bytes,5,opt,name=ttl" json:"ttl,omitempty"`
|
||||
MemoryMapMaxSizeMB uint32 `protobuf:"varint,6,opt,name=memorymapmaxsizemb" json:"memorymapmaxsizemb,omitempty"`
|
||||
}
|
||||
|
||||
func (m *AllocateVolumeRequest) Reset() { *m = AllocateVolumeRequest{} }
|
||||
@ -362,6 +366,13 @@ func (m *AllocateVolumeRequest) GetTtl() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *AllocateVolumeRequest) GetMemoryMapMaxSizeMB() uint32 {
|
||||
if m != nil {
|
||||
return m.MemoryMapMaxSizeMB
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type AllocateVolumeResponse struct {
|
||||
}
|
||||
|
||||
@ -2359,6 +2370,7 @@ func _VolumeServer_DeleteCollection_Handler(srv interface{}, ctx context.Context
|
||||
|
||||
func _VolumeServer_AllocateVolume_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(AllocateVolumeRequest)
|
||||
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -62,13 +62,14 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
|
||||
}
|
||||
|
||||
option := &topology.VolumeGrowOption{
|
||||
Collection: req.Collection,
|
||||
ReplicaPlacement: replicaPlacement,
|
||||
Ttl: ttl,
|
||||
Prealloacte: ms.preallocateSize,
|
||||
DataCenter: req.DataCenter,
|
||||
Rack: req.Rack,
|
||||
DataNode: req.DataNode,
|
||||
Collection: req.Collection,
|
||||
ReplicaPlacement: replicaPlacement,
|
||||
Ttl: ttl,
|
||||
Prealloacte: ms.preallocateSize,
|
||||
DataCenter: req.DataCenter,
|
||||
Rack: req.Rack,
|
||||
DataNode: req.DataNode,
|
||||
MemoryMapMaxSizeMB: req.MemoryMapMaxSizeMB,
|
||||
}
|
||||
|
||||
if !ms.Topo.HasWritableVolume(option) {
|
||||
|
@ -148,6 +148,11 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
memoryMapMaxSizeMB, err := needle.ReadMemoryMapMaxSizeMB(r.FormValue("memorymapmaxsizemb"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
preallocate := ms.preallocateSize
|
||||
if r.FormValue("preallocate") != "" {
|
||||
preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
|
||||
@ -156,13 +161,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
|
||||
}
|
||||
}
|
||||
volumeGrowOption := &topology.VolumeGrowOption{
|
||||
Collection: r.FormValue("collection"),
|
||||
ReplicaPlacement: replicaPlacement,
|
||||
Ttl: ttl,
|
||||
Prealloacte: preallocate,
|
||||
DataCenter: r.FormValue("dataCenter"),
|
||||
Rack: r.FormValue("rack"),
|
||||
DataNode: r.FormValue("dataNode"),
|
||||
Collection: r.FormValue("collection"),
|
||||
ReplicaPlacement: replicaPlacement,
|
||||
Ttl: ttl,
|
||||
Prealloacte: preallocate,
|
||||
DataCenter: r.FormValue("dataCenter"),
|
||||
Rack: r.FormValue("rack"),
|
||||
DataNode: r.FormValue("dataNode"),
|
||||
MemoryMapMaxSizeMB: memoryMapMaxSizeMB,
|
||||
}
|
||||
return volumeGrowOption, nil
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p
|
||||
req.Replication,
|
||||
req.Ttl,
|
||||
req.Preallocate,
|
||||
req.MemoryMapMaxSizeMB,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
@ -60,7 +60,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
|
||||
_, found := l.volumes[vid]
|
||||
l.RUnlock()
|
||||
if !found {
|
||||
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0); e == nil {
|
||||
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil {
|
||||
l.Lock()
|
||||
l.volumes[vid] = v
|
||||
l.Unlock()
|
||||
|
42
weed/storage/memory_map/memory_map.go
Normal file
42
weed/storage/memory_map/memory_map.go
Normal file
@ -0,0 +1,42 @@
|
||||
// +build !windows
|
||||
|
||||
package memory_map
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
type MemoryBuffer struct {
|
||||
aligned_length uint64
|
||||
length uint64
|
||||
aligned_ptr uintptr
|
||||
ptr uintptr
|
||||
Buffer []byte
|
||||
}
|
||||
|
||||
type MemoryMap struct {
|
||||
File *os.File
|
||||
file_memory_map_handle uintptr
|
||||
write_map_views []MemoryBuffer
|
||||
max_length uint64
|
||||
End_of_file int64
|
||||
}
|
||||
|
||||
var FileMemoryMap = make(map[string]*MemoryMap)
|
||||
|
||||
func (mMap *MemoryMap) CreateMemoryMap(file *os.File, maxLength uint64) {
|
||||
}
|
||||
|
||||
func (mMap *MemoryMap) WriteMemory(offset uint64, length uint64, data []byte) {
|
||||
|
||||
}
|
||||
|
||||
func (mMap *MemoryMap) ReadMemory(offset uint64, length uint64) ([]byte, error) {
|
||||
dataSlice := []byte{}
|
||||
return dataSlice, fmt.Errorf("Memory Map not implemented for this platform")
|
||||
}
|
||||
|
||||
func (mBuffer *MemoryMap) DeleteFileAndMemoryMap() {
|
||||
|
||||
}
|
343
weed/storage/memory_map/memory_map_windows.go
Normal file
343
weed/storage/memory_map/memory_map_windows.go
Normal file
@ -0,0 +1,343 @@
|
||||
// +build windows
|
||||
|
||||
package memory_map
|
||||
|
||||
import (
|
||||
"os"
|
||||
"reflect"
|
||||
"syscall"
|
||||
"unsafe"
|
||||
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
||||
type MemoryBuffer struct {
|
||||
aligned_length uint64
|
||||
length uint64
|
||||
aligned_ptr uintptr
|
||||
ptr uintptr
|
||||
Buffer []byte
|
||||
}
|
||||
|
||||
type MemoryMap struct {
|
||||
File *os.File
|
||||
file_memory_map_handle uintptr
|
||||
write_map_views []MemoryBuffer
|
||||
max_length uint64
|
||||
End_of_file int64
|
||||
}
|
||||
|
||||
var FileMemoryMap = make(map[string]*MemoryMap)
|
||||
|
||||
type DWORDLONG = uint64
|
||||
type DWORD = uint32
|
||||
type WORD = uint16
|
||||
|
||||
var (
|
||||
modkernel32 = syscall.NewLazyDLL("kernel32.dll")
|
||||
|
||||
procGetSystemInfo = modkernel32.NewProc("GetSystemInfo")
|
||||
procGlobalMemoryStatusEx = modkernel32.NewProc("GlobalMemoryStatusEx")
|
||||
procGetProcessWorkingSetSize = modkernel32.NewProc("GetProcessWorkingSetSize")
|
||||
procSetProcessWorkingSetSize = modkernel32.NewProc("SetProcessWorkingSetSize")
|
||||
)
|
||||
|
||||
var currentProcess, _ = windows.GetCurrentProcess()
|
||||
var currentMinWorkingSet uint64 = 0
|
||||
var currentMaxWorkingSet uint64 = 0
|
||||
var _ = getProcessWorkingSetSize(uintptr(currentProcess), ¤tMinWorkingSet, ¤tMaxWorkingSet)
|
||||
|
||||
var systemInfo, _ = getSystemInfo()
|
||||
var chunkSize = uint64(systemInfo.dwAllocationGranularity) * 128
|
||||
|
||||
var memoryStatusEx, _ = globalMemoryStatusEx()
|
||||
var maxMemoryLimitBytes = uint64(float64(memoryStatusEx.ullTotalPhys) * 0.8)
|
||||
|
||||
func (mMap *MemoryMap) CreateMemoryMap(file *os.File, maxLength uint64) {
|
||||
|
||||
chunks := (maxLength / chunkSize)
|
||||
if chunks*chunkSize < maxLength {
|
||||
chunks = chunks + 1
|
||||
}
|
||||
|
||||
alignedMaxLength := chunks * chunkSize
|
||||
|
||||
maxLength_high := uint32(alignedMaxLength >> 32)
|
||||
maxLength_low := uint32(alignedMaxLength & 0xFFFFFFFF)
|
||||
file_memory_map_handle, err := windows.CreateFileMapping(windows.Handle(file.Fd()), nil, windows.PAGE_READWRITE, maxLength_high, maxLength_low, nil)
|
||||
|
||||
if err == nil {
|
||||
mMap.File = file
|
||||
mMap.file_memory_map_handle = uintptr(file_memory_map_handle)
|
||||
mMap.write_map_views = make([]MemoryBuffer, 0, alignedMaxLength/chunkSize)
|
||||
mMap.max_length = alignedMaxLength
|
||||
mMap.End_of_file = -1
|
||||
}
|
||||
}
|
||||
|
||||
func (mMap *MemoryMap) DeleteFileAndMemoryMap() {
|
||||
//First we close the file handles first to delete the file,
|
||||
//Then we unmap the memory to ensure the unmapping process doesn't write the data to disk
|
||||
windows.CloseHandle(windows.Handle(mMap.file_memory_map_handle))
|
||||
windows.CloseHandle(windows.Handle(mMap.File.Fd()))
|
||||
|
||||
for _, view := range mMap.write_map_views {
|
||||
view.releaseMemory()
|
||||
}
|
||||
|
||||
mMap.write_map_views = nil
|
||||
mMap.max_length = 0
|
||||
}
|
||||
|
||||
func min(x, y uint64) uint64 {
|
||||
if x < y {
|
||||
return x
|
||||
}
|
||||
return y
|
||||
}
|
||||
|
||||
func (mMap *MemoryMap) WriteMemory(offset uint64, length uint64, data []byte) {
|
||||
|
||||
for {
|
||||
if ((offset+length)/chunkSize)+1 > uint64(len(mMap.write_map_views)) {
|
||||
allocateChunk(mMap)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
remaining_length := length
|
||||
sliceIndex := offset / chunkSize
|
||||
sliceOffset := offset - (sliceIndex * chunkSize)
|
||||
dataOffset := uint64(0)
|
||||
|
||||
for {
|
||||
writeEnd := min((remaining_length + sliceOffset), chunkSize)
|
||||
copy(mMap.write_map_views[sliceIndex].Buffer[sliceOffset:writeEnd], data[dataOffset:])
|
||||
remaining_length -= (writeEnd - sliceOffset)
|
||||
dataOffset += (writeEnd - sliceOffset)
|
||||
|
||||
if remaining_length > 0 {
|
||||
sliceIndex += 1
|
||||
sliceOffset = 0
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if mMap.End_of_file < int64(offset+length-1) {
|
||||
mMap.End_of_file = int64(offset + length - 1)
|
||||
}
|
||||
}
|
||||
|
||||
func (mMap *MemoryMap) ReadMemory(offset uint64, length uint64) (dataSlice []byte, err error) {
|
||||
dataSlice = make([]byte, length)
|
||||
mBuffer, err := allocate(windows.Handle(mMap.file_memory_map_handle), offset, length, false)
|
||||
copy(dataSlice, mBuffer.Buffer)
|
||||
mBuffer.releaseMemory()
|
||||
return dataSlice, err
|
||||
}
|
||||
|
||||
func (mBuffer *MemoryBuffer) releaseMemory() {
|
||||
|
||||
windows.VirtualUnlock(mBuffer.aligned_ptr, uintptr(mBuffer.aligned_length))
|
||||
windows.UnmapViewOfFile(mBuffer.aligned_ptr)
|
||||
|
||||
currentMinWorkingSet -= mBuffer.aligned_length
|
||||
currentMaxWorkingSet -= mBuffer.aligned_length
|
||||
|
||||
if currentMinWorkingSet < maxMemoryLimitBytes {
|
||||
var _ = setProcessWorkingSetSize(uintptr(currentProcess), currentMinWorkingSet, currentMaxWorkingSet)
|
||||
}
|
||||
|
||||
mBuffer.ptr = 0
|
||||
mBuffer.aligned_ptr = 0
|
||||
mBuffer.length = 0
|
||||
mBuffer.aligned_length = 0
|
||||
mBuffer.Buffer = nil
|
||||
}
|
||||
|
||||
func allocateChunk(mMap *MemoryMap) {
|
||||
start := uint64(len(mMap.write_map_views)) * chunkSize
|
||||
mBuffer, err := allocate(windows.Handle(mMap.file_memory_map_handle), start, chunkSize, true)
|
||||
|
||||
if err == nil {
|
||||
mMap.write_map_views = append(mMap.write_map_views, mBuffer)
|
||||
}
|
||||
}
|
||||
|
||||
func allocate(hMapFile windows.Handle, offset uint64, length uint64, write bool) (MemoryBuffer, error) {
|
||||
|
||||
mBuffer := MemoryBuffer{}
|
||||
|
||||
//align memory allocations to the minium virtal memory allocation size
|
||||
dwSysGran := systemInfo.dwAllocationGranularity
|
||||
|
||||
start := (offset / uint64(dwSysGran)) * uint64(dwSysGran)
|
||||
diff := offset - start
|
||||
aligned_length := diff + length
|
||||
|
||||
offset_high := uint32(start >> 32)
|
||||
offset_low := uint32(start & 0xFFFFFFFF)
|
||||
|
||||
access := windows.FILE_MAP_READ
|
||||
|
||||
if write {
|
||||
access = windows.FILE_MAP_WRITE
|
||||
}
|
||||
|
||||
currentMinWorkingSet += aligned_length
|
||||
currentMaxWorkingSet += aligned_length
|
||||
|
||||
if currentMinWorkingSet < maxMemoryLimitBytes {
|
||||
// increase the process working set size to hint to windows memory manager to
|
||||
// prioritise keeping this memory mapped in physical memory over other standby memory
|
||||
var _ = setProcessWorkingSetSize(uintptr(currentProcess), currentMinWorkingSet, currentMaxWorkingSet)
|
||||
}
|
||||
|
||||
addr_ptr, errno := windows.MapViewOfFile(hMapFile,
|
||||
uint32(access), // read/write permission
|
||||
offset_high,
|
||||
offset_low,
|
||||
uintptr(aligned_length))
|
||||
|
||||
if addr_ptr == 0 {
|
||||
return mBuffer, errno
|
||||
}
|
||||
|
||||
if currentMinWorkingSet < maxMemoryLimitBytes {
|
||||
windows.VirtualLock(mBuffer.aligned_ptr, uintptr(mBuffer.aligned_length))
|
||||
}
|
||||
|
||||
mBuffer.aligned_ptr = addr_ptr
|
||||
mBuffer.aligned_length = aligned_length
|
||||
mBuffer.ptr = addr_ptr + uintptr(diff)
|
||||
mBuffer.length = length
|
||||
|
||||
slice_header := (*reflect.SliceHeader)(unsafe.Pointer(&mBuffer.Buffer))
|
||||
slice_header.Data = addr_ptr + uintptr(diff)
|
||||
slice_header.Len = int(length)
|
||||
slice_header.Cap = int(length)
|
||||
|
||||
return mBuffer, nil
|
||||
}
|
||||
|
||||
//typedef struct _MEMORYSTATUSEX {
|
||||
// DWORD dwLength;
|
||||
// DWORD dwMemoryLoad;
|
||||
// DWORDLONG ullTotalPhys;
|
||||
// DWORDLONG ullAvailPhys;
|
||||
// DWORDLONG ullTotalPageFile;
|
||||
// DWORDLONG ullAvailPageFile;
|
||||
// DWORDLONG ullTotalVirtual;
|
||||
// DWORDLONG ullAvailVirtual;
|
||||
// DWORDLONG ullAvailExtendedVirtual;
|
||||
// } MEMORYSTATUSEX, *LPMEMORYSTATUSEX;
|
||||
//https://docs.microsoft.com/en-gb/windows/win32/api/sysinfoapi/ns-sysinfoapi-memorystatusex
|
||||
|
||||
type _MEMORYSTATUSEX struct {
|
||||
dwLength DWORD
|
||||
dwMemoryLoad DWORD
|
||||
ullTotalPhys DWORDLONG
|
||||
ullAvailPhys DWORDLONG
|
||||
ullTotalPageFile DWORDLONG
|
||||
ullAvailPageFile DWORDLONG
|
||||
ullTotalVirtual DWORDLONG
|
||||
ullAvailVirtual DWORDLONG
|
||||
ullAvailExtendedVirtual DWORDLONG
|
||||
}
|
||||
|
||||
// BOOL GlobalMemoryStatusEx(
|
||||
// LPMEMORYSTATUSEX lpBuffer
|
||||
// );
|
||||
// https://docs.microsoft.com/en-gb/windows/win32/api/sysinfoapi/nf-sysinfoapi-globalmemorystatusex
|
||||
func globalMemoryStatusEx() (_MEMORYSTATUSEX, error) {
|
||||
var mem_status _MEMORYSTATUSEX
|
||||
|
||||
mem_status.dwLength = uint32(unsafe.Sizeof(mem_status))
|
||||
_, _, err := procGlobalMemoryStatusEx.Call(uintptr(unsafe.Pointer(&mem_status)))
|
||||
|
||||
if err != syscall.Errno(0) {
|
||||
return mem_status, err
|
||||
}
|
||||
return mem_status, nil
|
||||
}
|
||||
|
||||
// typedef struct _SYSTEM_INFO {
|
||||
// union {
|
||||
// DWORD dwOemId;
|
||||
// struct {
|
||||
// WORD wProcessorArchitecture;
|
||||
// WORD wReserved;
|
||||
// };
|
||||
// };
|
||||
// DWORD dwPageSize;
|
||||
// LPVOID lpMinimumApplicationAddress;
|
||||
// LPVOID lpMaximumApplicationAddress;
|
||||
// DWORD_PTR dwActiveProcessorMask;
|
||||
// DWORD dwNumberOfProcessors;
|
||||
// DWORD dwProcessorType;
|
||||
// DWORD dwAllocationGranularity;
|
||||
// WORD wProcessorLevel;
|
||||
// WORD wProcessorRevision;
|
||||
// } SYSTEM_INFO;
|
||||
// https://docs.microsoft.com/en-gb/windows/win32/api/sysinfoapi/ns-sysinfoapi-system_info
|
||||
type _SYSTEM_INFO struct {
|
||||
dwOemId DWORD
|
||||
dwPageSize DWORD
|
||||
lpMinimumApplicationAddress uintptr
|
||||
lpMaximumApplicationAddress uintptr
|
||||
dwActiveProcessorMask uintptr
|
||||
dwNumberOfProcessors DWORD
|
||||
dwProcessorType DWORD
|
||||
dwAllocationGranularity DWORD
|
||||
wProcessorLevel WORD
|
||||
wProcessorRevision WORD
|
||||
}
|
||||
|
||||
// void WINAPI GetSystemInfo(
|
||||
// _Out_ LPSYSTEM_INFO lpSystemInfo
|
||||
// );
|
||||
// https://docs.microsoft.com/en-us/windows/win32/api/sysinfoapi/nf-sysinfoapi-getsysteminfo
|
||||
func getSystemInfo() (_SYSTEM_INFO, error) {
|
||||
var si _SYSTEM_INFO
|
||||
_, _, err := procGetSystemInfo.Call(uintptr(unsafe.Pointer(&si)))
|
||||
if err != syscall.Errno(0) {
|
||||
return si, err
|
||||
}
|
||||
return si, nil
|
||||
}
|
||||
|
||||
// BOOL GetProcessWorkingSetSize(
|
||||
// HANDLE hProcess,
|
||||
// PSIZE_T lpMinimumWorkingSetSize,
|
||||
// PSIZE_T lpMaximumWorkingSetSize
|
||||
// );
|
||||
// https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-getprocessworkingsetsize
|
||||
|
||||
func getProcessWorkingSetSize(process uintptr, dwMinWorkingSet *uint64, dwMaxWorkingSet *uint64) error {
|
||||
r1, _, err := syscall.Syscall(procGetProcessWorkingSetSize.Addr(), 3, process, uintptr(unsafe.Pointer(dwMinWorkingSet)), uintptr(unsafe.Pointer(dwMaxWorkingSet)))
|
||||
if r1 == 0 {
|
||||
if err != syscall.Errno(0) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BOOL SetProcessWorkingSetSize(
|
||||
// HANDLE hProcess,
|
||||
// SIZE_T dwMinimumWorkingSetSize,
|
||||
// SIZE_T dwMaximumWorkingSetSize
|
||||
// );
|
||||
// https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-setprocessworkingsetsize
|
||||
|
||||
func setProcessWorkingSetSize(process uintptr, dwMinWorkingSet uint64, dwMaxWorkingSet uint64) error {
|
||||
r1, _, err := syscall.Syscall(procSetProcessWorkingSetSize.Addr(), 3, process, uintptr(dwMinWorkingSet), uintptr(dwMaxWorkingSet))
|
||||
if r1 == 0 {
|
||||
if err != syscall.Errno(0) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -9,6 +9,7 @@ import (
|
||||
"math"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/memory_map"
|
||||
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
@ -29,39 +30,25 @@ func (n *Needle) DiskSize(version Version) int64 {
|
||||
return GetActualSize(n.Size, version)
|
||||
}
|
||||
|
||||
func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) {
|
||||
if end, e := w.Seek(0, io.SeekEnd); e == nil {
|
||||
defer func(w *os.File, off int64) {
|
||||
if err != nil {
|
||||
if te := w.Truncate(end); te != nil {
|
||||
glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
|
||||
}
|
||||
}
|
||||
}(w, end)
|
||||
offset = uint64(end)
|
||||
} else {
|
||||
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
|
||||
return
|
||||
}
|
||||
func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, error) {
|
||||
|
||||
writeBytes := make([]byte, 0)
|
||||
|
||||
switch version {
|
||||
case Version1:
|
||||
header := make([]byte, NeedleHeaderSize)
|
||||
CookieToBytes(header[0:CookieSize], n.Cookie)
|
||||
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
|
||||
n.Size = uint32(len(n.Data))
|
||||
size = n.Size
|
||||
util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
|
||||
if _, err = w.Write(header); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = w.Write(n.Data); err != nil {
|
||||
return
|
||||
}
|
||||
actualSize = NeedleHeaderSize + int64(n.Size)
|
||||
size := n.Size
|
||||
actualSize := NeedleHeaderSize + int64(n.Size)
|
||||
writeBytes = append(writeBytes, header...)
|
||||
writeBytes = append(writeBytes, n.Data...)
|
||||
padding := PaddingLength(n.Size, version)
|
||||
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
|
||||
_, err = w.Write(header[0 : NeedleChecksumSize+padding])
|
||||
return
|
||||
writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...)
|
||||
return writeBytes, size, actualSize, nil
|
||||
case Version2, Version3:
|
||||
header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
|
||||
CookieToBytes(header[0:CookieSize], n.Cookie)
|
||||
@ -92,82 +79,101 @@ func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32
|
||||
} else {
|
||||
n.Size = 0
|
||||
}
|
||||
size = n.DataSize
|
||||
util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
|
||||
if _, err = w.Write(header[0:NeedleHeaderSize]); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:NeedleHeaderSize]...)
|
||||
if n.DataSize > 0 {
|
||||
util.Uint32toBytes(header[0:4], n.DataSize)
|
||||
if _, err = w.Write(header[0:4]); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = w.Write(n.Data); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:4]...)
|
||||
writeBytes = append(writeBytes, n.Data...)
|
||||
util.Uint8toBytes(header[0:1], n.Flags)
|
||||
if _, err = w.Write(header[0:1]); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:1]...)
|
||||
if n.HasName() {
|
||||
util.Uint8toBytes(header[0:1], n.NameSize)
|
||||
if _, err = w.Write(header[0:1]); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = w.Write(n.Name[:n.NameSize]); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:1]...)
|
||||
writeBytes = append(writeBytes, n.Name[:n.NameSize]...)
|
||||
}
|
||||
if n.HasMime() {
|
||||
util.Uint8toBytes(header[0:1], n.MimeSize)
|
||||
if _, err = w.Write(header[0:1]); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = w.Write(n.Mime); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:1]...)
|
||||
writeBytes = append(writeBytes, n.Mime...)
|
||||
}
|
||||
if n.HasLastModifiedDate() {
|
||||
util.Uint64toBytes(header[0:8], n.LastModified)
|
||||
if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[8-LastModifiedBytesLength:8]...)
|
||||
}
|
||||
if n.HasTtl() && n.Ttl != nil {
|
||||
n.Ttl.ToBytes(header[0:TtlBytesLength])
|
||||
if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:TtlBytesLength]...)
|
||||
}
|
||||
if n.HasPairs() {
|
||||
util.Uint16toBytes(header[0:2], n.PairsSize)
|
||||
if _, err = w.Write(header[0:2]); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err = w.Write(n.Pairs); err != nil {
|
||||
return
|
||||
}
|
||||
writeBytes = append(writeBytes, header[0:2]...)
|
||||
writeBytes = append(writeBytes, n.Pairs...)
|
||||
}
|
||||
}
|
||||
padding := PaddingLength(n.Size, version)
|
||||
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
|
||||
if version == Version2 {
|
||||
_, err = w.Write(header[0 : NeedleChecksumSize+padding])
|
||||
writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...)
|
||||
} else {
|
||||
// version3
|
||||
util.Uint64toBytes(header[NeedleChecksumSize:NeedleChecksumSize+TimestampSize], n.AppendAtNs)
|
||||
_, err = w.Write(header[0 : NeedleChecksumSize+TimestampSize+padding])
|
||||
writeBytes = append(writeBytes, header[0:NeedleChecksumSize+TimestampSize+padding]...)
|
||||
}
|
||||
|
||||
return offset, n.DataSize, GetActualSize(n.Size, version), err
|
||||
return writeBytes, n.DataSize, GetActualSize(n.Size, version), nil
|
||||
}
|
||||
return 0, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
|
||||
|
||||
return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
|
||||
}
|
||||
|
||||
func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) {
|
||||
|
||||
mMap, exists := memory_map.FileMemoryMap[w.Name()]
|
||||
if !exists {
|
||||
if end, e := w.Seek(0, io.SeekEnd); e == nil {
|
||||
defer func(w *os.File, off int64) {
|
||||
if err != nil {
|
||||
if te := w.Truncate(end); te != nil {
|
||||
glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
|
||||
}
|
||||
}
|
||||
}(w, end)
|
||||
offset = uint64(end)
|
||||
} else {
|
||||
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
offset = uint64(mMap.End_of_file + 1)
|
||||
}
|
||||
|
||||
bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version)
|
||||
|
||||
if err == nil {
|
||||
if exists {
|
||||
mMap.WriteMemory(offset, uint64(len(bytesToWrite)), bytesToWrite)
|
||||
} else {
|
||||
_, err = w.Write(bytesToWrite)
|
||||
}
|
||||
}
|
||||
|
||||
return offset, size, actualSize, err
|
||||
}
|
||||
|
||||
func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
|
||||
dataSlice = make([]byte, int(GetActualSize(size, version)))
|
||||
_, err = r.ReadAt(dataSlice, offset)
|
||||
return dataSlice, err
|
||||
|
||||
dataSize := GetActualSize(size, version)
|
||||
dataSlice = make([]byte, int(dataSize))
|
||||
|
||||
mMap, exists := memory_map.FileMemoryMap[r.Name()]
|
||||
if exists {
|
||||
dataSlice, err := mMap.ReadMemory(uint64(offset), uint64(dataSize))
|
||||
return dataSlice, err
|
||||
} else {
|
||||
_, err = r.ReadAt(dataSlice, offset)
|
||||
return dataSlice, err
|
||||
}
|
||||
}
|
||||
|
||||
// ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set.
|
||||
@ -280,14 +286,24 @@ func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, byt
|
||||
n = new(Needle)
|
||||
if version == Version1 || version == Version2 || version == Version3 {
|
||||
bytes = make([]byte, NeedleHeaderSize)
|
||||
var count int
|
||||
count, err = r.ReadAt(bytes, offset)
|
||||
if count <= 0 || err != nil {
|
||||
return nil, bytes, 0, err
|
||||
|
||||
mMap, exists := memory_map.FileMemoryMap[r.Name()]
|
||||
if exists {
|
||||
bytes, err = mMap.ReadMemory(uint64(offset), NeedleHeaderSize)
|
||||
if err != nil {
|
||||
return nil, bytes, 0, err
|
||||
}
|
||||
} else {
|
||||
var count int
|
||||
count, err = r.ReadAt(bytes, offset)
|
||||
if count <= 0 || err != nil {
|
||||
return nil, bytes, 0, err
|
||||
}
|
||||
}
|
||||
n.ParseNeedleHeader(bytes)
|
||||
bodyLength = NeedleBodyLength(n.Size, version)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
14
weed/storage/needle/volume_memory_map_max_size.go
Normal file
14
weed/storage/needle/volume_memory_map_max_size.go
Normal file
@ -0,0 +1,14 @@
|
||||
package needle
|
||||
|
||||
import "strconv"
|
||||
|
||||
func ReadMemoryMapMaxSizeMB(MemoryMapMaxSizeMBString string) (uint32, error) {
|
||||
if MemoryMapMaxSizeMBString == "" {
|
||||
return 0, nil
|
||||
}
|
||||
var MemoryMapMaxSize64 uint64
|
||||
var err error
|
||||
MemoryMapMaxSize64, err = strconv.ParseUint(MemoryMapMaxSizeMBString, 10, 32)
|
||||
MemoryMapMaxSize := uint32(MemoryMapMaxSize64)
|
||||
return MemoryMapMaxSize, err
|
||||
}
|
10
weed/storage/needle/volume_memory_map_max_size_test.go
Normal file
10
weed/storage/needle/volume_memory_map_max_size_test.go
Normal file
@ -0,0 +1,10 @@
|
||||
package needle
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestMemoryMapMaxSizeReadWrite(t *testing.T) {
|
||||
memoryMapSize, _ := ReadMemoryMapMaxSizeMB("5000")
|
||||
if memoryMapSize != 5000 {
|
||||
t.Errorf("empty memoryMapSize:%v", memoryMapSize)
|
||||
}
|
||||
}
|
@ -59,7 +59,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
|
||||
|
||||
return
|
||||
}
|
||||
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error {
|
||||
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, memoryMapMaxSizeMB uint32) error {
|
||||
rt, e := NewReplicaPlacementFromString(replicaPlacement)
|
||||
if e != nil {
|
||||
return e
|
||||
@ -68,7 +68,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate)
|
||||
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, memoryMapMaxSizeMB)
|
||||
return e
|
||||
}
|
||||
func (s *Store) DeleteCollection(collection string) (e error) {
|
||||
@ -101,14 +101,14 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
|
||||
}
|
||||
return ret
|
||||
}
|
||||
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) error {
|
||||
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMB uint32) error {
|
||||
if s.findVolume(vid) != nil {
|
||||
return fmt.Errorf("Volume Id %d already exists!", vid)
|
||||
}
|
||||
if location := s.FindFreeLocation(); location != nil {
|
||||
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
|
||||
location.Directory, vid, collection, replicaPlacement, ttl)
|
||||
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil {
|
||||
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMB); err == nil {
|
||||
location.SetVolume(vid, volume)
|
||||
glog.V(0).Infof("add volume %d", vid)
|
||||
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
|
||||
|
@ -18,13 +18,14 @@ import (
|
||||
)
|
||||
|
||||
type Volume struct {
|
||||
Id needle.VolumeId
|
||||
dir string
|
||||
Collection string
|
||||
dataFile *os.File
|
||||
nm NeedleMapper
|
||||
needleMapKind NeedleMapType
|
||||
readOnly bool
|
||||
Id needle.VolumeId
|
||||
dir string
|
||||
Collection string
|
||||
dataFile *os.File
|
||||
nm NeedleMapper
|
||||
needleMapKind NeedleMapType
|
||||
readOnly bool
|
||||
MemoryMapMaxSizeMB uint32
|
||||
|
||||
SuperBlock
|
||||
|
||||
@ -38,9 +39,9 @@ type Volume struct {
|
||||
isCompacting bool
|
||||
}
|
||||
|
||||
func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) (v *Volume, e error) {
|
||||
func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMB uint32) (v *Volume, e error) {
|
||||
// if replicaPlacement is nil, the superblock will be loaded from disk
|
||||
v = &Volume{dir: dirname, Collection: collection, Id: id}
|
||||
v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMB: memoryMapMaxSizeMB}
|
||||
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
|
||||
v.needleMapKind = needleMapKind
|
||||
e = v.load(true, true, needleMapKind, preallocate)
|
||||
|
@ -1,4 +1,4 @@
|
||||
// +build !linux
|
||||
// +build !linux,!windows
|
||||
|
||||
package storage
|
||||
|
||||
@ -8,8 +8,8 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
)
|
||||
|
||||
func createVolumeFile(fileName string, preallocate int64) (file *os.File, e error) {
|
||||
file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
|
||||
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
if preallocate > 0 {
|
||||
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
|
||||
}
|
||||
|
@ -9,8 +9,8 @@ import (
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
)
|
||||
|
||||
func createVolumeFile(fileName string, preallocate int64) (file *os.File, e error) {
|
||||
file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
|
||||
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
if preallocate != 0 {
|
||||
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
|
||||
glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
|
||||
|
38
weed/storage/volume_create_windows.go
Normal file
38
weed/storage/volume_create_windows.go
Normal file
@ -0,0 +1,38 @@
|
||||
// +build windows
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/memory_map"
|
||||
"golang.org/x/sys/windows"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/os_overloads"
|
||||
)
|
||||
|
||||
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
|
||||
|
||||
mMap, exists := memory_map.FileMemoryMap[fileName]
|
||||
if !exists {
|
||||
|
||||
if preallocate > 0 {
|
||||
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
|
||||
}
|
||||
|
||||
if memoryMapSizeMB > 0 {
|
||||
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true)
|
||||
memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap)
|
||||
|
||||
new_mMap := memory_map.FileMemoryMap[fileName]
|
||||
new_mMap.CreateMemoryMap(file, 1024*1024*uint64(memoryMapSizeMB))
|
||||
return file, e
|
||||
} else {
|
||||
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false)
|
||||
return file, e
|
||||
}
|
||||
} else {
|
||||
return mMap.File, nil
|
||||
}
|
||||
}
|
@ -42,7 +42,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||
}
|
||||
} else {
|
||||
if createDatIfMissing {
|
||||
v.dataFile, e = createVolumeFile(fileName+".dat", preallocate)
|
||||
v.dataFile, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMB)
|
||||
} else {
|
||||
return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/memory_map"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
. "github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||
)
|
||||
@ -44,6 +45,12 @@ func (v *Volume) Destroy() (err error) {
|
||||
err = fmt.Errorf("volume %d is compacting", v.Id)
|
||||
return
|
||||
}
|
||||
mMap, exists := memory_map.FileMemoryMap[v.dataFile.Name()]
|
||||
if exists {
|
||||
mMap.DeleteFileAndMemoryMap()
|
||||
delete(memory_map.FileMemoryMap, v.dataFile.Name())
|
||||
}
|
||||
|
||||
v.Close()
|
||||
os.Remove(v.FileName() + ".dat")
|
||||
os.Remove(v.FileName() + ".idx")
|
||||
|
@ -4,11 +4,13 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/memory_map"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -74,24 +76,34 @@ func (s *SuperBlock) Initialized() bool {
|
||||
}
|
||||
|
||||
func (v *Volume) maybeWriteSuperBlock() error {
|
||||
stat, e := v.dataFile.Stat()
|
||||
if e != nil {
|
||||
glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile.Name(), e)
|
||||
return e
|
||||
}
|
||||
if stat.Size() == 0 {
|
||||
v.SuperBlock.version = needle.CurrentVersion
|
||||
_, e = v.dataFile.Write(v.SuperBlock.Bytes())
|
||||
if e != nil && os.IsPermission(e) {
|
||||
//read-only, but zero length - recreate it!
|
||||
if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
|
||||
if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
|
||||
v.readOnly = false
|
||||
|
||||
mMap, exists := memory_map.FileMemoryMap[v.dataFile.Name()]
|
||||
if exists {
|
||||
if mMap.End_of_file == -1 {
|
||||
v.SuperBlock.version = needle.CurrentVersion
|
||||
mMap.WriteMemory(0, uint64(len(v.SuperBlock.Bytes())), v.SuperBlock.Bytes())
|
||||
}
|
||||
return nil
|
||||
} else {
|
||||
stat, e := v.dataFile.Stat()
|
||||
if e != nil {
|
||||
glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile.Name(), e)
|
||||
return e
|
||||
}
|
||||
if stat.Size() == 0 {
|
||||
v.SuperBlock.version = needle.CurrentVersion
|
||||
_, e = v.dataFile.Write(v.SuperBlock.Bytes())
|
||||
if e != nil && os.IsPermission(e) {
|
||||
//read-only, but zero length - recreate it!
|
||||
if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
|
||||
if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
|
||||
v.readOnly = false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return e
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
func (v *Volume) readSuperBlock() (err error) {
|
||||
@ -101,15 +113,26 @@ func (v *Volume) readSuperBlock() (err error) {
|
||||
|
||||
// ReadSuperBlock reads from data file and load it into volume's super block
|
||||
func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) {
|
||||
if _, err = dataFile.Seek(0, 0); err != nil {
|
||||
err = fmt.Errorf("cannot seek to the beginning of %s: %v", dataFile.Name(), err)
|
||||
return
|
||||
}
|
||||
|
||||
header := make([]byte, _SuperBlockSize)
|
||||
if _, e := dataFile.Read(header); e != nil {
|
||||
err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e)
|
||||
return
|
||||
mMap, exists := memory_map.FileMemoryMap[dataFile.Name()]
|
||||
if exists {
|
||||
header, err = mMap.ReadMemory(0, _SuperBlockSize)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if _, err = dataFile.Seek(0, 0); err != nil {
|
||||
err = fmt.Errorf("cannot seek to the beginning of %s: %v", dataFile.Name(), err)
|
||||
return
|
||||
}
|
||||
if _, e := dataFile.Read(header); e != nil {
|
||||
err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
superBlock.version = needle.Version(header[0])
|
||||
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
|
||||
err = fmt.Errorf("cannot read replica type: %s", err.Error())
|
||||
|
@ -22,86 +22,98 @@ func (v *Volume) garbageLevel() float64 {
|
||||
}
|
||||
|
||||
func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error {
|
||||
glog.V(3).Infof("Compacting volume %d ...", v.Id)
|
||||
//no need to lock for copy on write
|
||||
//v.accessLock.Lock()
|
||||
//defer v.accessLock.Unlock()
|
||||
//glog.V(3).Infof("Got Compaction lock...")
|
||||
v.isCompacting = true
|
||||
defer func() {
|
||||
v.isCompacting = false
|
||||
}()
|
||||
|
||||
filePath := v.FileName()
|
||||
v.lastCompactIndexOffset = v.IndexFileSize()
|
||||
v.lastCompactRevision = v.SuperBlock.CompactionRevision
|
||||
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
|
||||
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
|
||||
if v.MemoryMapMaxSizeMB > 0 { //it makes no sense to compact in memory
|
||||
glog.V(3).Infof("Compacting volume %d ...", v.Id)
|
||||
//no need to lock for copy on write
|
||||
//v.accessLock.Lock()
|
||||
//defer v.accessLock.Unlock()
|
||||
//glog.V(3).Infof("Got Compaction lock...")
|
||||
v.isCompacting = true
|
||||
defer func() {
|
||||
v.isCompacting = false
|
||||
}()
|
||||
|
||||
filePath := v.FileName()
|
||||
v.lastCompactIndexOffset = v.IndexFileSize()
|
||||
v.lastCompactRevision = v.SuperBlock.CompactionRevision
|
||||
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
|
||||
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (v *Volume) Compact2() error {
|
||||
glog.V(3).Infof("Compact2 volume %d ...", v.Id)
|
||||
|
||||
v.isCompacting = true
|
||||
defer func() {
|
||||
v.isCompacting = false
|
||||
}()
|
||||
if v.MemoryMapMaxSizeMB > 0 { //it makes no sense to compact in memory
|
||||
glog.V(3).Infof("Compact2 volume %d ...", v.Id)
|
||||
|
||||
filePath := v.FileName()
|
||||
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
|
||||
return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx")
|
||||
v.isCompacting = true
|
||||
defer func() {
|
||||
v.isCompacting = false
|
||||
}()
|
||||
|
||||
filePath := v.FileName()
|
||||
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
|
||||
return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx")
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (v *Volume) CommitCompact() error {
|
||||
glog.V(0).Infof("Committing volume %d vacuuming...", v.Id)
|
||||
if v.MemoryMapMaxSizeMB > 0 { //it makes no sense to compact in memory
|
||||
glog.V(0).Infof("Committing volume %d vacuuming...", v.Id)
|
||||
|
||||
v.isCompacting = true
|
||||
defer func() {
|
||||
v.isCompacting = false
|
||||
}()
|
||||
v.isCompacting = true
|
||||
defer func() {
|
||||
v.isCompacting = false
|
||||
}()
|
||||
|
||||
v.dataFileAccessLock.Lock()
|
||||
defer v.dataFileAccessLock.Unlock()
|
||||
v.dataFileAccessLock.Lock()
|
||||
defer v.dataFileAccessLock.Unlock()
|
||||
|
||||
glog.V(3).Infof("Got volume %d committing lock...", v.Id)
|
||||
v.nm.Close()
|
||||
if err := v.dataFile.Close(); err != nil {
|
||||
glog.V(0).Infof("fail to close volume %d", v.Id)
|
||||
}
|
||||
v.dataFile = nil
|
||||
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
|
||||
|
||||
var e error
|
||||
if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
|
||||
glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e)
|
||||
e = os.Remove(v.FileName() + ".cpd")
|
||||
if e != nil {
|
||||
return e
|
||||
glog.V(3).Infof("Got volume %d committing lock...", v.Id)
|
||||
v.nm.Close()
|
||||
if err := v.dataFile.Close(); err != nil {
|
||||
glog.V(0).Infof("fail to close volume %d", v.Id)
|
||||
}
|
||||
e = os.Remove(v.FileName() + ".cpx")
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
} else {
|
||||
v.dataFile = nil
|
||||
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
|
||||
|
||||
var e error
|
||||
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
|
||||
return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e)
|
||||
if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
|
||||
glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e)
|
||||
e = os.Remove(v.FileName() + ".cpd")
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
e = os.Remove(v.FileName() + ".cpx")
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
} else {
|
||||
var e error
|
||||
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
|
||||
return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e)
|
||||
}
|
||||
if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
|
||||
return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e)
|
||||
}
|
||||
}
|
||||
if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
|
||||
return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e)
|
||||
|
||||
//glog.V(3).Infof("Pretending to be vacuuming...")
|
||||
//time.Sleep(20 * time.Second)
|
||||
|
||||
os.RemoveAll(v.FileName() + ".ldb")
|
||||
os.RemoveAll(v.FileName() + ".bdb")
|
||||
|
||||
glog.V(3).Infof("Loading volume %d commit file...", v.Id)
|
||||
if e = v.load(true, false, v.needleMapKind, 0); e != nil {
|
||||
return e
|
||||
}
|
||||
}
|
||||
|
||||
//glog.V(3).Infof("Pretending to be vacuuming...")
|
||||
//time.Sleep(20 * time.Second)
|
||||
|
||||
os.RemoveAll(v.FileName() + ".ldb")
|
||||
os.RemoveAll(v.FileName() + ".bdb")
|
||||
|
||||
glog.V(3).Infof("Loading volume %d commit file...", v.Id)
|
||||
if e = v.load(true, false, v.needleMapKind, 0); e != nil {
|
||||
return e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -299,7 +311,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
|
||||
var (
|
||||
dst, idx *os.File
|
||||
)
|
||||
if dst, err = createVolumeFile(dstName, preallocate); err != nil {
|
||||
if dst, err = createVolumeFile(dstName, preallocate, v.MemoryMapMaxSizeMB); err != nil {
|
||||
return
|
||||
}
|
||||
defer dst.Close()
|
||||
|
@ -68,7 +68,7 @@ func TestCompaction(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(dir) // clean up
|
||||
|
||||
v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &needle.TTL{}, 0)
|
||||
v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &needle.TTL{}, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume creation: %v", err)
|
||||
}
|
||||
@ -95,7 +95,7 @@ func TestCompaction(t *testing.T) {
|
||||
|
||||
v.Close()
|
||||
|
||||
v, err = NewVolume(dir, "", 1, NeedleMapInMemory, nil, nil, 0)
|
||||
v, err = NewVolume(dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume reloading: %v", err)
|
||||
}
|
||||
|
@ -18,11 +18,12 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.Vol
|
||||
return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
|
||||
|
||||
_, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
|
||||
VolumeId: uint32(vid),
|
||||
Collection: option.Collection,
|
||||
Replication: option.ReplicaPlacement.String(),
|
||||
Ttl: option.Ttl.String(),
|
||||
Preallocate: option.Prealloacte,
|
||||
VolumeId: uint32(vid),
|
||||
Collection: option.Collection,
|
||||
Replication: option.ReplicaPlacement.String(),
|
||||
Ttl: option.Ttl.String(),
|
||||
Preallocate: option.Prealloacte,
|
||||
MemoryMapMaxSizeMB: option.MemoryMapMaxSizeMB,
|
||||
})
|
||||
return deleteErr
|
||||
})
|
||||
|
@ -21,13 +21,14 @@ This package is created to resolve these replica placement issues:
|
||||
*/
|
||||
|
||||
type VolumeGrowOption struct {
|
||||
Collection string
|
||||
ReplicaPlacement *storage.ReplicaPlacement
|
||||
Ttl *needle.TTL
|
||||
Prealloacte int64
|
||||
DataCenter string
|
||||
Rack string
|
||||
DataNode string
|
||||
Collection string
|
||||
ReplicaPlacement *storage.ReplicaPlacement
|
||||
Ttl *needle.TTL
|
||||
Prealloacte int64
|
||||
DataCenter string
|
||||
Rack string
|
||||
DataNode string
|
||||
MemoryMapMaxSizeMB uint32
|
||||
}
|
||||
|
||||
type VolumeGrowth struct {
|
||||
|
Loading…
Reference in New Issue
Block a user