diff --git a/weed/storage/backend/memory_map/memory_map.go b/weed/storage/backend/memory_map/memory_map.go index e940fcc0e..5dc7ba33d 100644 --- a/weed/storage/backend/memory_map/memory_map.go +++ b/weed/storage/backend/memory_map/memory_map.go @@ -21,8 +21,6 @@ type MemoryMap struct { End_of_file int64 } -var FileMemoryMap = make(map[string]*MemoryMap) - func ReadMemoryMapMaxSizeMb(memoryMapMaxSizeMbString string) (uint32, error) { if memoryMapMaxSizeMbString == "" { return 0, nil diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go new file mode 100644 index 000000000..d999b917e --- /dev/null +++ b/weed/storage/backend/memory_map/memory_map_backend.go @@ -0,0 +1,60 @@ +package memory_map + +import ( + "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/storage/backend" +) + +var ( + _ backend.DataStorageBackend = &MemoryMappedFile{} +) + +type MemoryMappedFile struct { + mm *MemoryMap +} + +func NewMemoryMappedFile(f *os.File, memoryMapSizeMB uint32) *MemoryMappedFile { + mmf := &MemoryMappedFile{ + mm : new(MemoryMap), + } + mmf.mm.CreateMemoryMap(f, 1024*1024*uint64(memoryMapSizeMB)) + return mmf +} + +func (mmf *MemoryMappedFile) ReadAt(p []byte, off int64) (n int, err error) { + readBytes, e := mmf.mm.ReadMemory(uint64(off), uint64(len(p))) + if e != nil { + return 0, e + } + // TODO avoid the extra copy + copy(p, readBytes) + return len(readBytes), nil +} + +func (mmf *MemoryMappedFile) WriteAt(p []byte, off int64) (n int, err error) { + mmf.mm.WriteMemory(uint64(off), uint64(len(p)), p) + return len(p), nil +} + +func (mmf *MemoryMappedFile) Truncate(off int64) error { + return nil +} + +func (mmf *MemoryMappedFile) Close() error { + mmf.mm.DeleteFileAndMemoryMap() + return nil +} + +func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err error) { + stat, e := mmf.mm.File.Stat() + if e == nil { + return stat.Size(), stat.ModTime(), nil + } + return 0, time.Time{}, err +} + +func (mmf *MemoryMappedFile) String() string { + return mmf.mm.File.Name() +} diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 353a3c78b..8e5d18b1a 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -8,7 +8,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -128,33 +127,24 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err func (n *Needle) Append(w backend.DataStorageBackend, version Version) (offset uint64, size uint32, actualSize int64, err error) { - mMap, exists := memory_map.FileMemoryMap[w.String()] - if !exists { - if end, _, e := w.GetStat(); e == nil { - defer func(w backend.DataStorageBackend, off int64) { - if err != nil { - if te := w.Truncate(end); te != nil { - glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.String(), end, te) - } + if end, _, e := w.GetStat(); e == nil { + defer func(w backend.DataStorageBackend, off int64) { + if err != nil { + if te := w.Truncate(end); te != nil { + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.String(), end, te) } - }(w, end) - offset = uint64(end) - } else { - err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) - return - } + } + }(w, end) + offset = uint64(end) } else { - offset = uint64(mMap.End_of_file + 1) + err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) + return } bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version) if err == nil { - if exists { - mMap.WriteMemory(offset, uint64(len(bytesToWrite)), bytesToWrite) - } else { - _, err = w.WriteAt(bytesToWrite, int64(offset)) - } + _, err = w.WriteAt(bytesToWrite, int64(offset)) } return offset, size, actualSize, err @@ -165,14 +155,9 @@ func ReadNeedleBlob(r backend.DataStorageBackend, offset int64, size uint32, ver dataSize := GetActualSize(size, version) dataSlice = make([]byte, int(dataSize)) - mMap, exists := memory_map.FileMemoryMap[r.String()] - if exists { - dataSlice, err := mMap.ReadMemory(uint64(offset), uint64(dataSize)) - return dataSlice, err - } else { - _, err = r.ReadAt(dataSlice, offset) - return dataSlice, err - } + _, err = r.ReadAt(dataSlice, offset) + return dataSlice, err + } // ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set. @@ -286,19 +271,12 @@ func ReadNeedleHeader(r backend.DataStorageBackend, version Version, offset int6 if version == Version1 || version == Version2 || version == Version3 { bytes = make([]byte, NeedleHeaderSize) - mMap, exists := memory_map.FileMemoryMap[r.String()] - if exists { - bytes, err = mMap.ReadMemory(uint64(offset), NeedleHeaderSize) - if err != nil { - return nil, bytes, 0, err - } - } else { - var count int - count, err = r.ReadAt(bytes, offset) - if count <= 0 || err != nil { - return nil, bytes, 0, err - } + var count int + count, err = r.ReadAt(bytes, offset) + if count <= 0 || err != nil { + return nil, bytes, 0, err } + n.ParseNeedleHeader(bytes) bodyLength = NeedleBodyLength(n.Size, version) } diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go index ef58e5871..b27a62990 100644 --- a/weed/storage/volume_create.go +++ b/weed/storage/volume_create.go @@ -6,12 +6,13 @@ import ( "os" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" ) -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) { +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) { file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if preallocate > 0 { glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) } - return file, e + return backend.NewDiskFile(file), e } diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go index d9dfc3862..e3305d991 100644 --- a/weed/storage/volume_create_linux.go +++ b/weed/storage/volume_create_linux.go @@ -7,13 +7,14 @@ import ( "syscall" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" ) -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) { +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) { file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if preallocate != 0 { syscall.Fallocate(int(file.Fd()), 1, 0, preallocate) glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName) } - return file, e + return backend.NewDiskFile(file), e } diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go index 12826f613..81536810b 100644 --- a/weed/storage/volume_create_windows.go +++ b/weed/storage/volume_create_windows.go @@ -3,36 +3,26 @@ package storage import ( - "os" - "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" "golang.org/x/sys/windows" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map/os_overloads" ) -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) { +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) { - mMap, exists := memory_map.FileMemoryMap[fileName] - if !exists { - - if preallocate > 0 { - glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) - } - - if memoryMapSizeMB > 0 { - file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true) - memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap) - - new_mMap := memory_map.FileMemoryMap[fileName] - new_mMap.CreateMemoryMap(file, 1024*1024*uint64(memoryMapSizeMB)) - return file, e - } else { - file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false) - return file, e - } - } else { - return mMap.File, nil + if preallocate > 0 { + glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) } + + if memoryMapSizeMB > 0 { + file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true) + return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), e + } else { + file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false) + return backend.NewDiskFile(file), e + } + } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index e769321c4..6f1d8fe40 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -25,13 +25,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind var e error fileName := v.FileName() alreadyHasSuperBlock := false - var dataFile *os.File // open dat file if exists, canRead, canWrite, modifiedTime, fileSize := checkFile(fileName + ".dat"); exists { if !canRead { return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) } + var dataFile *os.File if canWrite { dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) } else { @@ -43,14 +43,14 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if fileSize >= _SuperBlockSize { alreadyHasSuperBlock = true } + v.DataBackend = backend.NewDiskFile(dataFile) } else { if createDatIfMissing { - dataFile, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb) + v.DataBackend, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb) } else { return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName) } } - v.DataBackend = backend.NewDiskFile(dataFile) if e != nil { if !os.IsPermission(e) { diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index f2b17a827..d9b79795b 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -10,7 +10,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" "github.com/chrislusf/seaweedfs/weed/storage/needle" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -46,12 +45,6 @@ func (v *Volume) Destroy() (err error) { err = fmt.Errorf("volume %d is compacting", v.Id) return } - mMap, exists := memory_map.FileMemoryMap[v.DataBackend.String()] - if exists { - mMap.DeleteFileAndMemoryMap() - delete(memory_map.FileMemoryMap, v.DataBackend.String()) - } - v.Close() os.Remove(v.FileName() + ".dat") os.Remove(v.FileName() + ".idx") diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index 6faf34ef3..bce5af465 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -4,11 +4,9 @@ import ( "fmt" "os" - "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" @@ -78,35 +76,26 @@ func (s *SuperBlock) Initialized() bool { func (v *Volume) maybeWriteSuperBlock() error { - mMap, exists := memory_map.FileMemoryMap[v.DataBackend.String()] - if exists { - if mMap.End_of_file == -1 { - v.SuperBlock.version = needle.CurrentVersion - mMap.WriteMemory(0, uint64(len(v.SuperBlock.Bytes())), v.SuperBlock.Bytes()) - } - return nil - } else { - datSize, _, e := v.DataBackend.GetStat() - if e != nil { - glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.String(), e) - return e - } - if datSize == 0 { - v.SuperBlock.version = needle.CurrentVersion - _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) - if e != nil && os.IsPermission(e) { - //read-only, but zero length - recreate it! - var dataFile *os.File - if dataFile, e = os.Create(v.DataBackend.String()); e == nil { - v.DataBackend = backend.NewDiskFile(dataFile) - if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil { - v.readOnly = false - } + datSize, _, e := v.DataBackend.GetStat() + if e != nil { + glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.String(), e) + return e + } + if datSize == 0 { + v.SuperBlock.version = needle.CurrentVersion + _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) + if e != nil && os.IsPermission(e) { + //read-only, but zero length - recreate it! + var dataFile *os.File + if dataFile, e = os.Create(v.DataBackend.String()); e == nil { + v.DataBackend = backend.NewDiskFile(dataFile) + if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil { + v.readOnly = false } } } - return e } + return e } func (v *Volume) readSuperBlock() (err error) { @@ -118,18 +107,9 @@ func (v *Volume) readSuperBlock() (err error) { func ReadSuperBlock(datBackend backend.DataStorageBackend) (superBlock SuperBlock, err error) { header := make([]byte, _SuperBlockSize) - mMap, exists := memory_map.FileMemoryMap[datBackend.String()] - if exists { - header, err = mMap.ReadMemory(0, _SuperBlockSize) - if err != nil { - err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), err) - return - } - } else { - if _, e := datBackend.ReadAt(header, 0); e != nil { - err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), e) - return - } + if _, e := datBackend.ReadAt(header, 0); e != nil { + err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), e) + return } superBlock.version = needle.Version(header[0]) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index e5d932c9e..e90746b54 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -312,7 +312,8 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { var ( - dst, idx *os.File + dst backend.DataStorageBackend + idx *os.File ) if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil { return @@ -328,7 +329,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca v: v, now: uint64(time.Now().Unix()), nm: NewBtreeNeedleMap(idx), - dstBackend: backend.NewDiskFile(dst), + dstBackend: dst, writeThrottler: util.NewWriteThrottler(compactionBytePerSecond), } err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)