diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index f74191742..0bfa12180 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -2,15 +2,13 @@ package storage import ( "fmt" - "io" "os" - "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/util" ) type NeedleMapper interface { - Put(key uint64, offset uint32, size uint32) (int, error) + Put(key uint64, offset uint32, size uint32) error Get(key uint64) (element *NeedleValue, ok bool) Delete(key uint64) error Close() @@ -19,7 +17,6 @@ type NeedleMapper interface { DeletedSize() uint64 FileCount() int DeletedCount() int - Visit(visit func(NeedleValue) error) (err error) MaxFileKey() uint64 } @@ -31,146 +28,33 @@ type mapMetric struct { MaximumFileKey uint64 `json:"MaxFileKey"` } -type NeedleMap struct { - indexFile *os.File - m CompactMap - - mapMetric -} - -func NewNeedleMap(file *os.File) *NeedleMap { - nm := &NeedleMap{ - m: NewCompactMap(), - indexFile: file, - } - return nm -} - -const ( - RowsToRead = 1024 -) - -func LoadNeedleMap(file *os.File) (*NeedleMap, error) { - nm := NewNeedleMap(file) - e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { - if key > nm.MaximumFileKey { - nm.MaximumFileKey = key - } - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) - if offset > 0 { - oldSize := nm.m.Set(Key(key), offset, size) - glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - } else { - oldSize := nm.m.Delete(Key(key)) - glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - return nil - }) - glog.V(1).Infoln("max file key:", nm.MaximumFileKey) - return nm, e -} - -// walks through the index file, calls fn function with each key, offset, size -// stops with the error returned by the fn function -func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error { - var readerOffset int64 - bytes := make([]byte, 16*RowsToRead) - count, e := r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) - readerOffset += int64(count) - var ( - key uint64 - offset, size uint32 - i int - ) - - for count > 0 && e == nil || e == io.EOF { - for i = 0; i+16 <= count; i += 16 { - key = util.BytesToUint64(bytes[i : i+8]) - offset = util.BytesToUint32(bytes[i+8 : i+12]) - size = util.BytesToUint32(bytes[i+12 : i+16]) - if e = fn(key, offset, size); e != nil { - return e - } - } - if e == io.EOF { - return nil - } - count, e = r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) - readerOffset += int64(count) - } - return e -} - -func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { - if key > nm.MaximumFileKey { - nm.MaximumFileKey = key - } - oldSize := nm.m.Set(Key(key), offset, size) +func appendToIndexFile(indexFile *os.File, + key uint64, offset uint32, size uint32) error { bytes := make([]byte, 16) util.Uint64toBytes(bytes[0:8], key) util.Uint32toBytes(bytes[8:12], offset) util.Uint32toBytes(bytes[12:16], size) - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) + if _, err := indexFile.Seek(0, 2); err != nil { + return fmt.Errorf("cannot seek end of indexfile %s: %v", + indexFile.Name(), err) + } + _, err := indexFile.Write(bytes) + return err +} + +func (mm *mapMetric) logDelete(deletedByteCount uint32) { + mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount) + mm.DeletionCounter++ +} + +func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) { + if key > mm.MaximumFileKey { + mm.MaximumFileKey = key + } + mm.FileCounter++ + mm.FileByteCounter = mm.FileByteCounter + uint64(newSize) if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + mm.DeletionCounter++ + mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize) } - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return 0, fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) - } - return nm.indexFile.Write(bytes) -} -func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { - element, ok = nm.m.Get(Key(key)) - return -} -func (nm *NeedleMap) Delete(key uint64) error { - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key))) - bytes := make([]byte, 16) - util.Uint64toBytes(bytes[0:8], key) - util.Uint32toBytes(bytes[8:12], 0) - util.Uint32toBytes(bytes[12:16], 0) - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) - } - if _, err := nm.indexFile.Write(bytes); err != nil { - return fmt.Errorf("error writing to indexfile %s: %v", nm.indexFile.Name(), err) - } - nm.DeletionCounter++ - return nil -} -func (nm *NeedleMap) Close() { - _ = nm.indexFile.Close() -} -func (nm *NeedleMap) Destroy() error { - nm.Close() - return os.Remove(nm.indexFile.Name()) -} -func (nm NeedleMap) ContentSize() uint64 { - return nm.FileByteCounter -} -func (nm NeedleMap) DeletedSize() uint64 { - return nm.DeletionByteCounter -} -func (nm NeedleMap) FileCount() int { - return nm.FileCounter -} -func (nm NeedleMap) DeletedCount() int { - return nm.DeletionCounter -} -func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) { - return nm.m.Visit(visit) -} -func (nm NeedleMap) MaxFileKey() uint64 { - return nm.MaximumFileKey } diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go new file mode 100644 index 000000000..73595278d --- /dev/null +++ b/go/storage/needle_map_leveldb.go @@ -0,0 +1,151 @@ +package storage + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" + "github.com/syndtr/goleveldb/leveldb" +) + +type LevelDbNeedleMap struct { + dbFileName string + indexFile *os.File + db *leveldb.DB + mapMetric +} + +func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) { + m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} + if !isLevelDbFresh(dbFileName, indexFile) { + glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) + generateDbFile(dbFileName, indexFile) + glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) + } + glog.V(1).Infof("Opening %s...", dbFileName) + if m.db, err = leveldb.OpenFile(dbFileName, nil); err != nil { + return + } + glog.V(1).Infof("Loading %s...", indexFile.Name()) + nm, indexLoadError := LoadNeedleMap(indexFile) + if indexLoadError != nil { + return nil, indexLoadError + } + m.mapMetric = nm.mapMetric + return +} + +func isLevelDbFresh(dbFileName string, indexFile *os.File) bool { + // normally we always write to index file first + dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG")) + if err != nil { + return false + } + defer dbLogFile.Close() + dbStat, dbStatErr := dbLogFile.Stat() + indexStat, indexStatErr := indexFile.Stat() + if dbStatErr != nil || indexStatErr != nil { + glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) + return false + } + + return dbStat.ModTime().After(indexStat.ModTime()) +} + +func generateDbFile(dbFileName string, indexFile *os.File) error { + db, err := leveldb.OpenFile(dbFileName, nil) + if err != nil { + return err + } + defer db.Close() + return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { + if offset > 0 { + levelDbWrite(db, key, offset, size) + } else { + levelDbDelete(db, key) + } + return nil + }) +} + +func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + data, err := m.db.Get(bytes, nil) + if err != nil || len(data) != 8 { + glog.V(0).Infof("Failed to get %d %v", key, err) + return nil, false + } + offset := util.BytesToUint32(data[0:4]) + size := util.BytesToUint32(data[4:8]) + return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true +} + +func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { + var oldSize uint32 + if oldNeedle, ok := m.Get(key); ok { + oldSize = oldNeedle.Size + } + m.logPut(key, oldSize, size) + // write to index file first + if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil { + return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) + } + return levelDbWrite(m.db, key, offset, size) +} + +func levelDbWrite(db *leveldb.DB, + key uint64, offset uint32, size uint32) error { + bytes := make([]byte, 16) + util.Uint64toBytes(bytes[0:8], key) + util.Uint32toBytes(bytes[8:12], offset) + util.Uint32toBytes(bytes[12:16], size) + if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil { + return fmt.Errorf("failed to write leveldb: %v", err) + } + return nil +} +func levelDbDelete(db *leveldb.DB, key uint64) error { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + return db.Delete(bytes, nil) +} + +func (m *LevelDbNeedleMap) Delete(key uint64) error { + if oldNeedle, ok := m.Get(key); ok { + m.logDelete(oldNeedle.Size) + } + // write to index file first + if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil { + return err + } + return levelDbDelete(m.db, key) +} + +func (m *LevelDbNeedleMap) Close() { + m.db.Close() +} + +func (m *LevelDbNeedleMap) Destroy() error { + m.Close() + os.Remove(m.indexFile.Name()) + return os.Remove(m.dbFileName) +} + +func (m *LevelDbNeedleMap) ContentSize() uint64 { + return m.FileByteCounter +} +func (m *LevelDbNeedleMap) DeletedSize() uint64 { + return m.DeletionByteCounter +} +func (m *LevelDbNeedleMap) FileCount() int { + return m.FileCounter +} +func (m *LevelDbNeedleMap) DeletedCount() int { + return m.DeletionCounter +} +func (m *LevelDbNeedleMap) MaxFileKey() uint64 { + return m.MaximumFileKey +} diff --git a/go/storage/needle_map_memory.go b/go/storage/needle_map_memory.go new file mode 100644 index 000000000..5fce301bc --- /dev/null +++ b/go/storage/needle_map_memory.go @@ -0,0 +1,126 @@ +package storage + +import ( + "io" + "os" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" +) + +type NeedleMap struct { + indexFile *os.File + m CompactMap + + mapMetric +} + +func NewNeedleMap(file *os.File) *NeedleMap { + nm := &NeedleMap{ + m: NewCompactMap(), + indexFile: file, + } + return nm +} + +const ( + RowsToRead = 1024 +) + +func LoadNeedleMap(file *os.File) (*NeedleMap, error) { + nm := NewNeedleMap(file) + e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { + if key > nm.MaximumFileKey { + nm.MaximumFileKey = key + } + nm.FileCounter++ + nm.FileByteCounter = nm.FileByteCounter + uint64(size) + if offset > 0 { + oldSize := nm.m.Set(Key(key), offset, size) + glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) + if oldSize > 0 { + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + } else { + oldSize := nm.m.Delete(Key(key)) + glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + return nil + }) + glog.V(1).Infoln("max file key:", nm.MaximumFileKey) + return nm, e +} + +// walks through the index file, calls fn function with each key, offset, size +// stops with the error returned by the fn function +func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error { + var readerOffset int64 + bytes := make([]byte, 16*RowsToRead) + count, e := r.ReadAt(bytes, readerOffset) + glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + readerOffset += int64(count) + var ( + key uint64 + offset, size uint32 + i int + ) + + for count > 0 && e == nil || e == io.EOF { + for i = 0; i+16 <= count; i += 16 { + key = util.BytesToUint64(bytes[i : i+8]) + offset = util.BytesToUint32(bytes[i+8 : i+12]) + size = util.BytesToUint32(bytes[i+12 : i+16]) + if e = fn(key, offset, size); e != nil { + return e + } + } + if e == io.EOF { + return nil + } + count, e = r.ReadAt(bytes, readerOffset) + glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + readerOffset += int64(count) + } + return e +} + +func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error { + oldSize := nm.m.Set(Key(key), offset, size) + nm.logPut(key, oldSize, size) + return appendToIndexFile(nm.indexFile, key, offset, size) +} +func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + element, ok = nm.m.Get(Key(key)) + return +} +func (nm *NeedleMap) Delete(key uint64) error { + deletedBytes := nm.m.Delete(Key(key)) + nm.logDelete(deletedBytes) + return appendToIndexFile(nm.indexFile, key, 0, 0) +} +func (nm *NeedleMap) Close() { + _ = nm.indexFile.Close() +} +func (nm *NeedleMap) Destroy() error { + nm.Close() + return os.Remove(nm.indexFile.Name()) +} +func (nm NeedleMap) ContentSize() uint64 { + return nm.FileByteCounter +} +func (nm NeedleMap) DeletedSize() uint64 { + return nm.DeletionByteCounter +} +func (nm NeedleMap) FileCount() int { + return nm.FileCounter +} +func (nm NeedleMap) DeletedCount() int { + return nm.DeletionCounter +} + +func (nm NeedleMap) MaxFileKey() uint64 { + return nm.MaximumFileKey +} diff --git a/go/storage/store.go b/go/storage/store.go index 2695537f6..65e5b218b 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -90,18 +90,18 @@ func (s *Store) String() (str string) { return } -func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) { +func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, useLevelDb bool) (s *Store) { s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} location.volumes = make(map[VolumeId]*Volume) - location.loadExistingVolumes() + location.loadExistingVolumes(useLevelDb) s.Locations = append(s.Locations, location) } return } -func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error { +func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb bool, replicaPlacement string, ttlString string) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -117,7 +117,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, rt, ttl) + e = s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -129,7 +129,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil { + if err := s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl); err != nil { e = err } } @@ -178,14 +178,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error { +func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil { + if volume, err := NewVolume(location.Directory, collection, vid, useLevelDb, replicaPlacement, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -195,20 +195,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *Rep return fmt.Errorf("No more free space left") } -func (s *Store) FreezeVolume(volumeIdString string) error { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) - } - if v := s.findVolume(vid); v != nil { - if v.readOnly { - return fmt.Errorf("Volume %s is already read-only", volumeIdString) - } - return v.freeze() - } - return fmt.Errorf("volume id %d is not found during freeze", vid) -} -func (l *DiskLocation) loadExistingVolumes() { +func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) { if dirs, err := ioutil.ReadDir(l.Directory); err == nil { for _, dir := range dirs { name := dir.Name() @@ -221,7 +208,7 @@ func (l *DiskLocation) loadExistingVolumes() { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, useLevelDb, nil, nil); e == nil { l.volumes[vid] = v glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) } @@ -261,7 +248,7 @@ func (s *Store) SetRack(rack string) { func (s *Store) SetBootstrapMaster(bootstrapMaster string) { s.masterNodes = NewMasterNodes(bootstrapMaster) } -func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) { +func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { masterNode, e = s.masterNodes.findMaster() if e != nil { return @@ -317,13 +304,16 @@ func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) { return "", "", err } - jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data) + joinUrl := "http://" + masterNode + "/dir/join" + + jsonBlob, err := util.PostBytes(joinUrl, data) if err != nil { s.masterNodes.reset() return "", "", err } var ret operation.JoinResult if err := json.Unmarshal(jsonBlob, &ret); err != nil { + glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) return masterNode, "", err } if ret.Error != "" { @@ -354,7 +344,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { } if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) { glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit) - if _, _, e := s.Join(); e != nil { + if _, _, e := s.SendHeartbeatToMaster(); e != nil { glog.V(0).Infoln("error when reporting size:", e) } } diff --git a/go/storage/volume.go b/go/storage/volume.go index 1988c9aac..2b47fb497 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -27,10 +27,10 @@ type Volume struct { lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} - e = v.load(true, true) + e = v.load(true, true, useLevelDb) return } func (v *Volume) String() string { @@ -40,7 +40,7 @@ func (v *Volume) String() string { func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{} - e = v.load(false, false) + e = v.load(false, false, false) return } func (v *Volume) FileName() (fileName string) { @@ -51,7 +51,7 @@ func (v *Volume) FileName() (fileName string) { } return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error { var e error fileName := v.FileName() @@ -87,12 +87,6 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { e = v.maybeWriteSuperBlock() } if e == nil && alsoLoadIndex { - if v.readOnly { - if v.ensureConvertIdxToCdb(fileName) { - v.nm, e = OpenCdbMap(fileName + ".cdb") - return e - } - } var indexFile *os.File if v.readOnly { glog.V(1).Infoln("open to read file", fileName+".idx") @@ -105,9 +99,16 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly) - if v.nm, e = LoadNeedleMap(indexFile); e != nil { - glog.V(0).Infoln("loading error:", e) + if !useLevelDb { + glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly) + if v.nm, e = LoadNeedleMap(indexFile); e != nil { + glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e) + } + } else { + glog.V(0).Infoln("loading leveldb file", fileName+".ldb") + if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { + glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) + } } } return e @@ -202,7 +203,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { } nv, ok := v.nm.Get(n.Id) if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { - if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { + if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } } @@ -261,32 +262,6 @@ func (v *Volume) read(n *Needle) (int, error) { return -1, errors.New("Not Found") } -func (v *Volume) freeze() error { - if v.readOnly { - return nil - } - nm, ok := v.nm.(*NeedleMap) - if !ok { - return nil - } - v.accessLock.Lock() - defer v.accessLock.Unlock() - bn, _ := baseFilename(v.dataFile.Name()) - cdbFn := bn + ".cdb" - glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn) - err := DumpNeedleMapToCdb(cdbFn, nm) - if err != nil { - return err - } - if v.nm, err = OpenCdbMap(cdbFn); err != nil { - return err - } - nm.indexFile.Close() - os.Remove(nm.indexFile.Name()) - v.readOnly = true - return nil -} - func ScanVolumeFile(dirname string, collection string, id VolumeId, visitSuperBlock func(SuperBlock) error, readNeedleBody bool, @@ -365,34 +340,6 @@ func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti modTime = fi.ModTime() return } -func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { - var indexFile *os.File - var e error - _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb") - _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx") - if cdbCanRead && cdbModTime.After(idxModeTime) { - return true - } - if !cdbCanWrite { - return false - } - if !idxCanRead { - glog.V(0).Infoln("Can not read file", fileName+".idx!") - return false - } - glog.V(2).Infoln("opening file", fileName+".idx") - if indexFile, e = os.Open(fileName + ".idx"); e != nil { - glog.V(0).Infoln("Failed to read file", fileName+".idx !") - return false - } - defer indexFile.Close() - glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName) - if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { - glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e) - return false - } - return true -} // volume is expired if modified time + volume ttl < now // except when volume is empty diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 7e026a61d..9f6f8e35f 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -38,7 +38,7 @@ func (v *Volume) commitCompact() error { } //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) - if e = v.load(true, false); e != nil { + if e = v.load(true, false, false); e != nil { return e } return nil @@ -73,7 +73,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro nv, ok := v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { - if _, err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { + if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } if _, err = n.Append(dst, v.Version()); err != nil { diff --git a/go/util/http_util.go b/go/util/http_util.go index 72cab76e1..52579d746 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -26,12 +26,12 @@ func init() { func PostBytes(url string, body []byte) ([]byte, error) { r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body)) if err != nil { - return nil, err + return nil, fmt.Errorf("Post to %s: %v", url, err) } defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { - return nil, err + return nil, fmt.Errorf("Read response body: %v", err) } return b, nil } diff --git a/go/weed/compact.go b/go/weed/compact.go index 71c4ea90f..6ce55a609 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool { } vid := storage.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil, nil) + v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, false, nil, nil) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/go/weed/fix.go b/go/weed/fix.go index b2df07554..d2cd40398 100644 --- a/go/weed/fix.go +++ b/go/weed/fix.go @@ -52,8 +52,8 @@ func runFix(cmd *Command, args []string) bool { }, false, func(n *storage.Needle, offset int64) error { glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped()) if n.Size > 0 { - count, pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size) - glog.V(2).Infof("saved %d with error %v", count, pe) + pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size) + glog.V(2).Infof("saved %d with error %v", n.Size, pe) } else { glog.V(2).Infof("skipping deleted file ...") return nm.Delete(n.Id) diff --git a/go/weed/server.go b/go/weed/server.go index 48612a27b..71346de0a 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -68,6 +68,7 @@ var ( volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + volumeUseLevelDb = cmdServer.Flag.Bool("volume.leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.") volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") @@ -235,6 +236,7 @@ func runServer(cmd *Command, args []string) bool { volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *serverIp, *volumePort, *serverPublicUrl, folders, maxCounts, + *volumeUseLevelDb, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, serverWhiteList, *volumeFixJpgOrientation, ) diff --git a/go/weed/volume.go b/go/weed/volume.go index aa2643d20..e2c6ebd94 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -32,6 +32,7 @@ type VolumeServerOptions struct { dataCenter *string rack *string whiteList []string + useLevelDb *bool fixJpgOrientation *bool } @@ -48,6 +49,7 @@ func init() { v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") + v.useLevelDb = cmdVolume.Flag.Bool("leveldb", false, "Change to leveldb mode to save memory with reduced performance of read and write.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") } @@ -116,6 +118,7 @@ func runVolume(cmd *Command, args []string) bool { volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *v.ip, *v.port, *v.publicUrl, v.folders, v.folderMaxLimits, + *v.useLevelDb, *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index e3878fac4..d84b39808 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -20,12 +20,14 @@ type VolumeServer struct { store *storage.Store guard *security.Guard + UseLevelDb bool FixJpgOrientation bool } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicUrl string, folders []string, maxCounts []int, + useLevelDb bool, masterNode string, pulseSeconds int, dataCenter string, rack string, whiteList []string, @@ -34,10 +36,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, pulseSeconds: pulseSeconds, dataCenter: dataCenter, rack: rack, + UseLevelDb: useLevelDb, FixJpgOrientation: fixJpgOrientation, } vs.SetMasterNode(masterNode) - vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts) + vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.UseLevelDb) vs.guard = security.NewGuard(whiteList, "") @@ -47,7 +50,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler)) adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler)) adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler)) - adminMux.HandleFunc("/admin/freeze_volume", vs.guard.WhiteList(vs.freezeVolumeHandler)) adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) @@ -66,7 +68,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, vs.store.SetDataCenter(vs.dataCenter) vs.store.SetRack(vs.rack) for { - master, secretKey, err := vs.store.Join() + master, secretKey, err := vs.store.SendHeartbeatToMaster() if err == nil { if !connected { connected = true @@ -75,7 +77,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, glog.V(0).Infoln("Volume Server Connected with master at", master) } } else { - glog.V(4).Infoln("Volume Server Failed to talk with master:", err.Error()) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs, err) if connected { connected = false } diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go index c84b72db0..0d70a757e 100644 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ b/go/weed/weed_server/volume_server_handlers_admin.go @@ -17,7 +17,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl")) + err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.UseLevelDb, r.FormValue("replication"), r.FormValue("ttl")) if err == nil { writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) } else { @@ -40,17 +40,6 @@ func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.R glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err) } -func (vs *VolumeServer) freezeVolumeHandler(w http.ResponseWriter, r *http.Request) { - //TODO: notify master that this volume will be read-only - err := vs.store.FreezeVolume(r.FormValue("volume")) - if err == nil { - writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) - } else { - writeJsonError(w, r, http.StatusInternalServerError, err) - } - glog.V(2).Infoln("freeze volume =", r.FormValue("volume"), ", error =", err) -} - func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION