Add "weed backup" command.

This is a pre-cursor for asynchronous replication.
This commit is contained in:
chrislusf 2015-05-26 00:58:41 -07:00
parent 7272af8ec4
commit 86cd40fba8
17 changed files with 635 additions and 131 deletions

View File

@ -0,0 +1,54 @@
package operation
import (
"encoding/json"
"fmt"
"net/url"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/util"
)
type SyncVolumeResponse struct {
Replication string `json:"Replication,omitempty"`
Ttl string `json:"Ttl,omitempty"`
TailOffset uint64 `json:"TailOffset,omitempty"`
CompactRevision uint16 `json:"CompactRevision,omitempty"`
IdxFileSize uint64 `json:"IdxFileSize,omitempty"`
Error string `json:"error,omitempty"`
}
func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) {
values := make(url.Values)
values.Add("volume", vid)
jsonBlob, err := util.Post("http://"+server+"/admin/sync/status", values)
glog.V(2).Info("sync volume result :", string(jsonBlob))
if err != nil {
return nil, err
}
var ret SyncVolumeResponse
err = json.Unmarshal(jsonBlob, &ret)
if err != nil {
return nil, err
}
if ret.Error != "" {
return nil, fmt.Errorf("Volume %s get sync status error: %s", vid, ret.Error)
}
return &ret, nil
}
func GetVolumeIdxEntries(server string, vid string, eachEntryFn func(key uint64, offset, size uint32)) error {
values := make(url.Values)
values.Add("volume", vid)
line := make([]byte, 16)
err := util.GetBufferStream("http://"+server+"/admin/sync/index", values, line, func(bytes []byte) {
key := util.BytesToUint64(bytes[:8])
offset := util.BytesToUint32(bytes[8:12])
size := util.BytesToUint32(bytes[12:16])
eachEntryFn(key, offset, size)
})
if err != nil {
return err
}
return nil
}

View File

@ -2,7 +2,9 @@ package storage
import (
"fmt"
"io/ioutil"
"os"
"sync"
"github.com/chrislusf/seaweedfs/go/util"
)
@ -26,9 +28,60 @@ type NeedleMapper interface {
FileCount() int
DeletedCount() int
MaxFileKey() uint64
IndexFileSize() uint64
IndexFileContent() ([]byte, error)
IndexFileName() string
}
type baseNeedleMapper struct {
indexFile *os.File
indexFileAccessLock sync.Mutex
mapMetric
}
func (nm baseNeedleMapper) IndexFileSize() uint64 {
stat, err := nm.indexFile.Stat()
if err == nil {
return uint64(stat.Size())
}
return 0
}
func (nm baseNeedleMapper) IndexFileName() string {
return nm.indexFile.Name()
}
func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) {
key = util.BytesToUint64(bytes[:8])
offset = util.BytesToUint32(bytes[8:12])
size = util.BytesToUint32(bytes[12:16])
return
}
func (nm baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error {
bytes := make([]byte, 16)
util.Uint64toBytes(bytes[0:8], key)
util.Uint32toBytes(bytes[8:12], offset)
util.Uint32toBytes(bytes[12:16], size)
nm.indexFileAccessLock.Lock()
defer nm.indexFileAccessLock.Unlock()
if _, err := nm.indexFile.Seek(0, 2); err != nil {
return fmt.Errorf("cannot seek end of indexfile %s: %v",
nm.indexFile.Name(), err)
}
_, err := nm.indexFile.Write(bytes)
return err
}
func (nm baseNeedleMapper) IndexFileContent() ([]byte, error) {
nm.indexFileAccessLock.Lock()
defer nm.indexFileAccessLock.Unlock()
return ioutil.ReadFile(nm.indexFile.Name())
}
type mapMetric struct {
indexFile *os.File
DeletionCounter int `json:"DeletionCounter"`
FileCounter int `json:"FileCounter"`
DeletionByteCounter uint64 `json:"DeletionByteCounter"`
@ -36,20 +89,6 @@ type mapMetric struct {
MaximumFileKey uint64 `json:"MaxFileKey"`
}
func appendToIndexFile(indexFile *os.File,
key uint64, offset uint32, size uint32) error {
bytes := make([]byte, 16)
util.Uint64toBytes(bytes[0:8], key)
util.Uint32toBytes(bytes[8:12], offset)
util.Uint32toBytes(bytes[12:16], size)
if _, err := indexFile.Seek(0, 2); err != nil {
return fmt.Errorf("cannot seek end of indexfile %s: %v",
indexFile.Name(), err)
}
_, err := indexFile.Write(bytes)
return err
}
func (mm *mapMetric) logDelete(deletedByteCount uint32) {
mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount)
mm.DeletionCounter++
@ -66,3 +105,19 @@ func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) {
mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize)
}
}
func (mm mapMetric) ContentSize() uint64 {
return mm.FileByteCounter
}
func (mm mapMetric) DeletedSize() uint64 {
return mm.DeletionByteCounter
}
func (mm mapMetric) FileCount() int {
return mm.FileCounter
}
func (mm mapMetric) DeletedCount() int {
return mm.DeletionCounter
}
func (mm mapMetric) MaxFileKey() uint64 {
return mm.MaximumFileKey
}

View File

@ -12,15 +12,15 @@ import (
type BoltDbNeedleMap struct {
dbFileName string
indexFile *os.File
db *bolt.DB
mapMetric
baseNeedleMapper
}
var boltdbBucket = []byte("weed")
func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) {
m = &BoltDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName}
m = &BoltDbNeedleMap{dbFileName: dbFileName}
m.indexFile = indexFile
if !isBoltDbFresh(dbFileName, indexFile) {
glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
generateBoltDbFile(dbFileName, indexFile)
@ -101,7 +101,7 @@ func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
}
m.logPut(key, oldSize, size)
// write to index file first
if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil {
if err := m.appendToIndexFile(key, offset, size); err != nil {
return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
}
return boltDbWrite(m.db, key, offset, size)
@ -148,7 +148,7 @@ func (m *BoltDbNeedleMap) Delete(key uint64) error {
m.logDelete(oldNeedle.Size)
}
// write to index file first
if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil {
if err := m.appendToIndexFile(key, 0, 0); err != nil {
return err
}
return boltDbDelete(m.db, key)
@ -163,19 +163,3 @@ func (m *BoltDbNeedleMap) Destroy() error {
os.Remove(m.indexFile.Name())
return os.Remove(m.dbFileName)
}
func (m *BoltDbNeedleMap) ContentSize() uint64 {
return m.FileByteCounter
}
func (m *BoltDbNeedleMap) DeletedSize() uint64 {
return m.DeletionByteCounter
}
func (m *BoltDbNeedleMap) FileCount() int {
return m.FileCounter
}
func (m *BoltDbNeedleMap) DeletedCount() int {
return m.DeletionCounter
}
func (m *BoltDbNeedleMap) MaxFileKey() uint64 {
return m.MaximumFileKey
}

View File

@ -12,13 +12,13 @@ import (
type LevelDbNeedleMap struct {
dbFileName string
indexFile *os.File
db *leveldb.DB
mapMetric
baseNeedleMapper
}
func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) {
m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName}
m = &LevelDbNeedleMap{dbFileName: dbFileName}
m.indexFile = indexFile
if !isLevelDbFresh(dbFileName, indexFile) {
glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
generateLevelDbFile(dbFileName, indexFile)
@ -89,7 +89,7 @@ func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
}
m.logPut(key, oldSize, size)
// write to index file first
if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil {
if err := m.appendToIndexFile(key, offset, size); err != nil {
return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
}
return levelDbWrite(m.db, key, offset, size)
@ -117,7 +117,7 @@ func (m *LevelDbNeedleMap) Delete(key uint64) error {
m.logDelete(oldNeedle.Size)
}
// write to index file first
if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil {
if err := m.appendToIndexFile(key, 0, 0); err != nil {
return err
}
return levelDbDelete(m.db, key)
@ -132,19 +132,3 @@ func (m *LevelDbNeedleMap) Destroy() error {
os.Remove(m.indexFile.Name())
return os.Remove(m.dbFileName)
}
func (m *LevelDbNeedleMap) ContentSize() uint64 {
return m.FileByteCounter
}
func (m *LevelDbNeedleMap) DeletedSize() uint64 {
return m.DeletionByteCounter
}
func (m *LevelDbNeedleMap) FileCount() int {
return m.FileCounter
}
func (m *LevelDbNeedleMap) DeletedCount() int {
return m.DeletionCounter
}
func (m *LevelDbNeedleMap) MaxFileKey() uint64 {
return m.MaximumFileKey
}

View File

@ -5,21 +5,19 @@ import (
"os"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/util"
)
type NeedleMap struct {
indexFile *os.File
m CompactMap
m CompactMap
mapMetric
baseNeedleMapper
}
func NewNeedleMap(file *os.File) *NeedleMap {
nm := &NeedleMap{
m: NewCompactMap(),
indexFile: file,
m: NewCompactMap(),
}
nm.indexFile = file
return nm
}
@ -70,9 +68,7 @@ func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e
for count > 0 && e == nil || e == io.EOF {
for i = 0; i+16 <= count; i += 16 {
key = util.BytesToUint64(bytes[i : i+8])
offset = util.BytesToUint32(bytes[i+8 : i+12])
size = util.BytesToUint32(bytes[i+12 : i+16])
key, offset, size = idxFileEntry(bytes[i : i+16])
if e = fn(key, offset, size); e != nil {
return e
}
@ -90,7 +86,7 @@ func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error {
oldSize := nm.m.Set(Key(key), offset, size)
nm.logPut(key, oldSize, size)
return appendToIndexFile(nm.indexFile, key, offset, size)
return nm.appendToIndexFile(key, offset, size)
}
func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
element, ok = nm.m.Get(Key(key))
@ -99,7 +95,7 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
func (nm *NeedleMap) Delete(key uint64) error {
deletedBytes := nm.m.Delete(Key(key))
nm.logDelete(deletedBytes)
return appendToIndexFile(nm.indexFile, key, 0, 0)
return nm.appendToIndexFile(key, 0, 0)
}
func (nm *NeedleMap) Close() {
_ = nm.indexFile.Close()
@ -108,19 +104,3 @@ func (nm *NeedleMap) Destroy() error {
nm.Close()
return os.Remove(nm.indexFile.Name())
}
func (nm NeedleMap) ContentSize() uint64 {
return nm.FileByteCounter
}
func (nm NeedleMap) DeletedSize() uint64 {
return nm.DeletionByteCounter
}
func (nm NeedleMap) FileCount() int {
return nm.FileCounter
}
func (nm NeedleMap) DeletedCount() int {
return nm.DeletionCounter
}
func (nm NeedleMap) MaxFileKey() uint64 {
return nm.MaximumFileKey
}

View File

@ -135,49 +135,37 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
return 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
func (n *Needle) Read(r *os.File, offset int64, size uint32, version Version) (ret int, err error) {
func ReadNeedleBlob(r *os.File, offset int64, size uint32) (bytes []byte, err error) {
padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
bytes = make([]byte, NeedleHeaderSize+size+NeedleChecksumSize+padding)
_, err = r.ReadAt(bytes, offset)
return
}
func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
bytes, err := ReadNeedleBlob(r, offset, size)
if err != nil {
return err
}
n.ParseNeedleHeader(bytes)
if n.Size != size {
return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size)
}
switch version {
case Version1:
bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
if ret, err = r.ReadAt(bytes, offset); err != nil {
return
}
n.readNeedleHeader(bytes)
n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
newChecksum := NewCRC(n.Data)
if checksum != newChecksum.Value() {
return 0, errors.New("CRC error! Data On Disk Corrupted")
}
n.Checksum = newChecksum
return
case Version2:
if size == 0 {
return 0, nil
}
bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
if ret, err = r.ReadAt(bytes, offset); err != nil {
return
}
if ret != int(NeedleHeaderSize+size+NeedleChecksumSize) {
return 0, errors.New("File Entry Not Found")
}
n.readNeedleHeader(bytes)
if n.Size != size {
return 0, fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size)
}
n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+n.Size : NeedleHeaderSize+n.Size+NeedleChecksumSize])
newChecksum := NewCRC(n.Data)
if checksum != newChecksum.Value() {
return 0, errors.New("CRC Found Data On Disk Corrupted")
}
n.Checksum = newChecksum
return
}
return 0, fmt.Errorf("Unsupported Version! (%d)", version)
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
newChecksum := NewCRC(n.Data)
if checksum != newChecksum.Value() {
return errors.New("CRC error! Data On Disk Corrupted")
}
n.Checksum = newChecksum
return nil
}
func (n *Needle) readNeedleHeader(bytes []byte) {
func (n *Needle) ParseNeedleHeader(bytes []byte) {
n.Cookie = util.BytesToUint32(bytes[0:4])
n.Id = util.BytesToUint64(bytes[4:12])
n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize])
@ -228,7 +216,7 @@ func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bod
if count <= 0 || err != nil {
return nil, 0, err
}
n.readNeedleHeader(bytes)
n.ParseNeedleHeader(bytes)
padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
bodyLength = n.Size + NeedleChecksumSize + padding
}

View File

@ -371,9 +371,9 @@ func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
}
return 0, nil
}
func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) {
if v := s.findVolume(i); v != nil {
return v.read(n)
return v.readNeedle(n)
}
return 0, fmt.Errorf("Volume %v not found!", i)
}

View File

@ -54,6 +54,9 @@ func (v *Volume) FileName() (fileName string) {
}
return
}
func (v *Volume) DataFile() *os.File {
return v.dataFile
}
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
var e error
fileName := v.FileName()
@ -152,7 +155,7 @@ func (v *Volume) isFileUnchanged(n *Needle) bool {
nv, ok := v.nm.Get(n.Id)
if ok && nv.Offset > 0 {
oldNeedle := new(Needle)
_, err := oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
if err != nil {
glog.V(0).Infof("Failed to check updated file %v", err)
return false
@ -180,6 +183,30 @@ func (v *Volume) Destroy() (err error) {
return
}
// AppendBlob append a blob to end of the data file, used in replication
func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
if v.readOnly {
err = fmt.Errorf("%s is read-only", v.dataFile.Name())
return
}
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
if offset, err = v.dataFile.Seek(0, 2); err != nil {
glog.V(0).Infof("failed to seek the end of file: %v", err)
return
}
//ensure file writing starting from aligned positions
if offset%NeedlePaddingSize != 0 {
offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
if offset, err = v.dataFile.Seek(offset, 0); err != nil {
glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
return
}
}
v.dataFile.Write(b)
return
}
func (v *Volume) write(n *Needle) (size uint32, err error) {
glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
if v.readOnly {
@ -250,17 +277,19 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
return 0, nil
}
func (v *Volume) read(n *Needle) (int, error) {
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedle(n *Needle) (int, error) {
nv, ok := v.nm.Get(n.Id)
if !ok || nv.Offset == 0 {
return -1, errors.New("Not Found")
}
bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
if err != nil {
return bytesRead, err
return 0, err
}
bytesRead := len(n.Data)
if !n.HasTtl() {
return bytesRead, err
return bytesRead, nil
}
ttlMinutes := n.Ttl.Minutes()
if ttlMinutes == 0 {

213
go/storage/volume_sync.go Normal file
View File

@ -0,0 +1,213 @@
package storage
import (
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"sort"
"strconv"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/util"
)
// The volume sync with a master volume via 2 steps:
// 1. The slave checks master side to find subscription checkpoint
// to setup the replication.
// 2. The slave receives the updates from master
/*
Assume the slave volume needs to follow the master volume.
The master volume could be compacted, and could be many files ahead of
slave volume.
Step 1:
The slave volume will ask the master volume for a snapshot
of (existing file entries, last offset, number of compacted times).
For each entry x in master existing file entries:
if x does not exist locally:
add x locally
For each entry y in local slave existing file entries:
if y does not exist on master:
delete y locally
Step 2:
After this, use the last offset and number of compacted times to request
the master volume to send a new file, and keep looping. If the number of
compacted times is changed, go back to step 1 (very likely this can be
optimized more later).
*/
func (v *Volume) Synchronize(volumeServer string) (err error) {
var lastCompactRevision uint16 = 0
var compactRevision uint16 = 0
var masterMap CompactMap
for i := 0; i < 3; i++ {
if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil {
return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
}
if lastCompactRevision != compactRevision && lastCompactRevision != 0 {
if err = v.Compact(); err != nil {
return fmt.Errorf("Compact Volume before synchronizing %v", err)
}
if err = v.commitCompact(); err != nil {
return fmt.Errorf("Commit Compact before synchronizing %v", err)
}
}
lastCompactRevision = compactRevision
if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil {
return
}
}
return
}
type ByOffset []NeedleValue
func (a ByOffset) Len() int { return len(a) }
func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
// trySynchronizing sync with remote volume server incrementally by
// make up the local and remote delta.
func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error {
slaveIdxFile, err := os.Open(v.nm.IndexFileName())
if err != nil {
return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
}
defer slaveIdxFile.Close()
slaveMap, err := LoadNeedleMap(slaveIdxFile)
if err != nil {
return fmt.Errorf("Load volume %d index file: %v", v.Id, err)
}
var delta []NeedleValue
if err := masterMap.Visit(func(needleValue NeedleValue) error {
if needleValue.Key == 0 {
return nil
}
if _, ok := slaveMap.Get(uint64(needleValue.Key)); ok {
return nil // skip intersection
}
delta = append(delta, needleValue)
return nil
}); err != nil {
return fmt.Errorf("Add master entry: %v", err)
}
if err := slaveMap.m.Visit(func(needleValue NeedleValue) error {
if needleValue.Key == 0 {
return nil
}
if _, ok := masterMap.Get(needleValue.Key); ok {
return nil // skip intersection
}
needleValue.Size = 0
delta = append(delta, needleValue)
return nil
}); err != nil {
return fmt.Errorf("Remove local entry: %v", err)
}
// simulate to same ordering of remote .dat file needle entries
sort.Sort(ByOffset(delta))
// make up the delta
fetchCount := 0
volumeDataContentHandlerUrl := "http://" + volumeServer + "/admin/sync/data"
for _, needleValue := range delta {
if needleValue.Size == 0 {
// remove file entry from local
v.removeNeedle(needleValue.Key)
continue
}
// add master file entry to local data file
if err := v.fetchNeedle(volumeDataContentHandlerUrl, needleValue, compactRevision); err != nil {
glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err)
return err
}
fetchCount++
}
glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer)
return nil
}
func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) {
m = NewCompactMap()
syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String())
if err != nil {
return m, 0, 0, err
}
total := 0
err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) {
// println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
if offset != 0 && size != 0 {
m.Set(Key(key), offset, size)
} else {
m.Delete(Key(key))
}
total++
})
glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision)
return m, syncStatus.TailOffset, syncStatus.CompactRevision, err
}
func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse {
var syncStatus = operation.SyncVolumeResponse{}
if stat, err := v.dataFile.Stat(); err == nil {
syncStatus.TailOffset = uint64(stat.Size())
}
syncStatus.IdxFileSize = v.nm.IndexFileSize()
syncStatus.CompactRevision = v.SuperBlock.CompactRevision
syncStatus.Ttl = v.SuperBlock.Ttl.String()
syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
return syncStatus
}
func (v *Volume) IndexFileContent() ([]byte, error) {
return v.nm.IndexFileContent()
}
// removeNeedle removes one needle by needle key
func (v *Volume) removeNeedle(key Key) {
n := new(Needle)
n.Id = uint64(key)
v.delete(n)
}
// fetchNeedle fetches a remote volume needle by vid, id, offset
// The compact revision is checked first in case the remote volume
// is compacted and the offset is invalid any more.
func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
needleValue NeedleValue, compactRevision uint16) error {
// add master file entry to local data file
values := make(url.Values)
values.Add("revision", strconv.Itoa(int(compactRevision)))
values.Add("volume", v.Id.String())
values.Add("id", needleValue.Key.String())
values.Add("offset", strconv.FormatUint(uint64(needleValue.Offset), 10))
values.Add("size", strconv.FormatUint(uint64(needleValue.Size), 10))
glog.V(4).Infof("Fetch %+v", needleValue)
return util.GetUrlStream(volumeDataContentHandlerUrl, values, func(r io.Reader) error {
b, err := ioutil.ReadAll(r)
if err != nil {
return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err)
}
offset, err := v.AppendBlob(b)
if err != nil {
return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
}
// println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size)
v.nm.Put(uint64(needleValue.Key), uint32(offset/NeedlePaddingSize), needleValue.Size)
return nil
})
}

View File

@ -3,6 +3,7 @@ package util
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
@ -84,6 +85,43 @@ func Delete(url string, jwt security.EncodedJwt) error {
return nil
}
func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
r, err := client.PostForm(url, values)
if err != nil {
return err
}
defer r.Body.Close()
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
bufferSize := len(allocatedBytes)
for {
n, err := r.Body.Read(allocatedBytes)
if n == bufferSize {
eachBuffer(allocatedBytes)
}
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
return nil
}
func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error {
r, err := client.PostForm(url, values)
if err != nil {
return err
}
defer r.Body.Close()
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
return readFn(r.Body)
}
func DownloadUrl(fileUrl string) (filename string, content []byte, e error) {
response, err := client.Get(fileUrl)
if err != nil {

90
go/weed/backup.go Normal file
View File

@ -0,0 +1,90 @@
package main
import (
"fmt"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/storage"
)
var (
s BackupOptions
)
type BackupOptions struct {
master *string
collection *string
dir *string
volumeId *int
}
func init() {
cmdBackup.Run = runBackup // break init cycle
s.master = cmdBackup.Flag.String("server", "localhost:9333", "SeaweedFS master location")
s.collection = cmdBackup.Flag.String("collection", "", "collection name")
s.dir = cmdBackup.Flag.String("dir", ".", "directory to store volume data files")
s.volumeId = cmdBackup.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.")
}
var cmdBackup = &Command{
UsageLine: "backup -dir=. -volumeId=234 -server=localhost:9333",
Short: "incrementally backup a volume to local folder",
Long: `Incrementally backup volume data.
It is expected that you use this inside a script, to loop through
all possible volume ids that needs to be backup to local folder.
The volume id does not need to exist locally or even remotely.
This will help to backup future new volumes.
Usually backing up is just copying the .dat (and .idx) files.
But it's tricky to incremententally copy the differences.
The complexity comes when there are multiple addition, deletion and compaction.
This tool will handle them correctly and efficiently, avoiding unnecessary data transporation.
`,
}
func runBackup(cmd *Command, args []string) bool {
if *s.volumeId == -1 {
return false
}
vid := storage.VolumeId(*s.volumeId)
// find volume location, replication, ttl info
lookup, err := operation.Lookup(*s.master, vid.String())
if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true
}
volumeServer := lookup.Locations[0].Url
stats, err := operation.GetVolumeSyncStatus(volumeServer, vid.String())
if err != nil {
fmt.Printf("Error get volume %d status: %v\n", vid, err)
return true
}
ttl, err := storage.ReadTTL(stats.Ttl)
if err != nil {
fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
return true
}
replication, err := storage.NewReplicaPlacementFromString(stats.Replication)
if err != nil {
fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
return true
}
v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true
}
if err := v.Synchronize(volumeServer); err != nil {
fmt.Printf("Error synchronizing volume %d: %v\n", vid, err)
return true
}
return true
}

View File

@ -7,7 +7,6 @@ import (
func init() {
cmdCompact.Run = runCompact // break init cycle
cmdCompact.IsDebug = cmdCompact.Flag.Bool("debug", false, "enable debug mode")
}
var cmdCompact = &Command{

View File

@ -21,6 +21,7 @@ var server *string
var commands = []*Command{
cmdBenchmark,
cmdBackup,
cmdCompact,
cmdFix,
cmdServer,

View File

@ -51,6 +51,9 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler))
adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler))
adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler))
adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler))
adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler))
adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))

View File

@ -47,7 +47,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
cookie := n.Cookie
count, e := vs.store.Read(volumeId, n)
count, e := vs.store.ReadVolumeNeedle(volumeId, n)
glog.V(4).Infoln("read bytes", count, "error", e)
if e != nil || count <= 0 {
glog.V(0).Infoln("read error:", e, r.URL.Path)

View File

@ -0,0 +1,86 @@
package weed_server
import (
"fmt"
"net/http"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/chrislusf/seaweedfs/go/util"
)
func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) {
v, err := vs.getVolume("volume", r)
if v == nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
syncStat := v.GetVolumeSyncStatus()
if syncStat.Error != "" {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Get Volume %d status error: %s", v.Id, syncStat.Error))
glog.V(2).Infoln("getVolumeSyncStatusHandler volume =", r.FormValue("volume"), ", error =", err)
} else {
writeJsonQuiet(w, r, http.StatusOK, syncStat)
}
}
func (vs *VolumeServer) getVolumeIndexContentHandler(w http.ResponseWriter, r *http.Request) {
v, err := vs.getVolume("volume", r)
if v == nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
content, err := v.IndexFileContent()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
w.Write(content)
}
func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *http.Request) {
v, err := vs.getVolume("volume", r)
if v == nil {
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("Not Found volume: %v", err))
return
}
if int(v.SuperBlock.CompactRevision) != util.ParseInt(r.FormValue("revision"), 0) {
writeJsonError(w, r, http.StatusExpectationFailed, fmt.Errorf("Requested Volume Revision is %s, but current revision is %d", r.FormValue("revision"), v.SuperBlock.CompactRevision))
return
}
offset := uint32(util.ParseUint64(r.FormValue("offset"), 0))
size := uint32(util.ParseUint64(r.FormValue("size"), 0))
content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
id := util.ParseUint64(r.FormValue("id"), 0)
n := new(storage.Needle)
n.ParseNeedleHeader(content)
if id != n.Id {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id))
return
}
w.Write(content)
}
func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) {
volumeIdString := r.FormValue(volumeParameterName)
if volumeIdString == "" {
err := fmt.Errorf("Empty Volume Id: Need to pass in %s=the_volume_id.", volumeParameterName)
return nil, err
}
vid, err := storage.NewVolumeId(volumeIdString)
if err != nil {
err = fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
return nil, err
}
v := vs.store.GetVolume(vid)
if v == nil {
return nil, fmt.Errorf("Not Found Volume Id %s: %d", volumeIdString, vid)
}
return v, nil
}

View File

@ -53,7 +53,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
glog.V(2).Infoln("deleting", n)
cookie := n.Cookie
count, ok := vs.store.Read(volumeId, n)
count, ok := vs.store.ReadVolumeNeedle(volumeId, n)
if ok != nil {
m := make(map[string]uint32)
@ -94,7 +94,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
n.ParsePath(id_cookie)
glog.V(4).Infoln("batch deleting", n)
cookie := n.Cookie
if _, err := vs.store.Read(volumeId, n); err != nil {
if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()})
continue
}