diff --git a/.gitignore b/.gitignore index ab4f54fd5..d121337a5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ -weed +go/weed/.goxc* tags *.swp diff --git a/docs/changelist.rst b/docs/changelist.rst index c4cfb9c1d..10ceb436e 100644 --- a/docs/changelist.rst +++ b/docs/changelist.rst @@ -5,6 +5,9 @@ Introduction ############ This file contains list of recent changes, important features, usage changes, data format changes, etc. Do read this if you upgrade. +v0.68 +##### +1. Filer supports storing file~file_id mapping to remote key-value storage Redis, Cassandra. So multiple filers are supported. v0.67 ##### diff --git a/go/filer/embedded_filer/directory_in_map.go b/go/filer/embedded_filer/directory_in_map.go index a1d0f43bd..1b904c874 100644 --- a/go/filer/embedded_filer/directory_in_map.go +++ b/go/filer/embedded_filer/directory_in_map.go @@ -62,7 +62,7 @@ func NewDirectoryManagerInMap(dirLogFile string) (dm *DirectoryManagerInMap, err //dm.Root do not use NewDirectoryEntryInMap, since dm.max will be changed dm.Root = &DirectoryEntryInMap{SubDirectories: make(map[string]*DirectoryEntryInMap)} if dm.logFile, err = os.OpenFile(dirLogFile, os.O_RDWR|os.O_CREATE, 0644); err != nil { - return nil, fmt.Errorf("cannot write directory log file %s.idx: %s", dirLogFile, err.Error()) + return nil, fmt.Errorf("cannot write directory log file %s.idx: %v", dirLogFile, err) } return dm, dm.load() } diff --git a/go/operation/lookup.go b/go/operation/lookup.go index 70bc7146e..c05eb1d2b 100644 --- a/go/operation/lookup.go +++ b/go/operation/lookup.go @@ -27,14 +27,14 @@ func (lr *LookupResult) String() string { } var ( - vc VidCache + vc VidCache // caching of volume locations, re-check if after 10 minutes ) func Lookup(server string, vid string) (ret *LookupResult, err error) { locations, cache_err := vc.Get(vid) if cache_err != nil { if ret, err = do_lookup(server, vid); err == nil { - vc.Set(vid, ret.Locations, 1*time.Minute) + vc.Set(vid, ret.Locations, 10*time.Minute) } } else { ret = &LookupResult{VolumeId: vid, Locations: locations} @@ -75,19 +75,44 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) { return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl + "/" + fileId, nil } +// LookupVolumeIds find volume locations by cache and actual lookup func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) { - values := make(url.Values) + ret := make(map[string]LookupResult) + var unknown_vids []string + + //check vid cache first for _, vid := range vids { + locations, cache_err := vc.Get(vid) + if cache_err == nil { + ret[vid] = LookupResult{VolumeId: vid, Locations: locations} + } else { + unknown_vids = append(unknown_vids, vid) + } + } + //return success if all volume ids are known + if len(unknown_vids) == 0 { + return ret, nil + } + + //only query unknown_vids + values := make(url.Values) + for _, vid := range unknown_vids { values.Add("volumeId", vid) } jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values) if err != nil { return nil, err } - ret := make(map[string]LookupResult) err = json.Unmarshal(jsonBlob, &ret) if err != nil { return nil, errors.New(err.Error() + " " + string(jsonBlob)) } + + //set newly checked vids to cache + for _, vid := range unknown_vids { + locations := ret[vid].Locations + vc.Set(vid, locations, 10*time.Minute) + } + return ret, nil } diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go index fbb59e9c0..1902ea5eb 100644 --- a/go/storage/cdb_map.go +++ b/go/storage/cdb_map.go @@ -214,12 +214,12 @@ func DumpNeedleMapToCdb(cdbName string, nm *NeedleMap) error { func openTempCdb(fileName string) (cdb.AdderFunc, cdb.CloserFunc, error) { fh, err := os.Create(fileName) if err != nil { - return nil, nil, fmt.Errorf("cannot create cdb file %s: %s", fileName, err.Error()) + return nil, nil, fmt.Errorf("cannot create cdb file %s: %v", fileName, err) } adder, closer, err := cdb.MakeFactory(fh) if err != nil { fh.Close() - return nil, nil, fmt.Errorf("error creating factory: %s", err.Error()) + return nil, nil, fmt.Errorf("error creating factory: %v", err) } return adder, func() error { if e := closer(); e != nil { diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 504ca1552..98a85b7ab 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -126,7 +126,7 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } if _, err := nm.indexFile.Seek(0, 2); err != nil { - return 0, fmt.Errorf("cannot go to the end of indexfile %s: %s", nm.indexFile.Name(), err.Error()) + return 0, fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) } return nm.indexFile.Write(bytes) } @@ -141,10 +141,10 @@ func (nm *NeedleMap) Delete(key uint64) error { util.Uint32toBytes(bytes[8:12], 0) util.Uint32toBytes(bytes[12:16], 0) if _, err := nm.indexFile.Seek(0, 2); err != nil { - return fmt.Errorf("cannot go to the end of indexfile %s: %s", nm.indexFile.Name(), err.Error()) + return fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) } if _, err := nm.indexFile.Write(bytes); err != nil { - return fmt.Errorf("error writing to indexfile %s: %s", nm.indexFile.Name(), err.Error()) + return fmt.Errorf("error writing to indexfile %s: %v", nm.indexFile.Name(), err) } nm.DeletionCounter++ return nil diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 663b5abbd..6925c04e0 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -30,12 +30,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { defer func(s io.Seeker, off int64) { if err != nil { if _, e = s.Seek(off, 0); e != nil { - glog.V(0).Infof("Failed to seek %s back to %d with error: %s", w, off, e.Error()) + glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e) } } }(s, end) } else { - err = fmt.Errorf("Cnnot Read Current Volume Position: %s", e.Error()) + err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) return } } diff --git a/go/storage/store.go b/go/storage/store.go index 65eed1d0e..7e2b23058 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -64,7 +64,7 @@ func (mn *MasterNodes) findMaster() (string, error) { } } if mn.lastNode < 0 { - return "", errors.New("No master node avalable!") + return "", errors.New("No master node available!") } return mn.nodes[mn.lastNode], nil } diff --git a/go/storage/volume.go b/go/storage/volume.go index a1eccd62c..6ef72cfcd 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -175,7 +175,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { } var offset int64 if offset, err = v.dataFile.Seek(0, 2); err != nil { - glog.V(0).Infof("faile to seek the end of file: %v", err) + glog.V(0).Infof("failed to seek the end of file: %v", err) return } @@ -287,10 +287,10 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId, visitNeedle func(n *Needle, offset int64) error) (err error) { var v *Volume if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil { - return errors.New("Failed to load volume:" + err.Error()) + return fmt.Errorf("Failed to load volume %d: %v", id, err) } if err = visitSuperBlock(v.SuperBlock); err != nil { - return errors.New("Failed to read super block:" + err.Error()) + return fmt.Errorf("Failed to read volume %d super block: %v", id, err) } version := v.Version() diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index bc8049ea4..a7f3acf2b 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -39,5 +39,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er } func (vi VolumeInfo) String() string { - return fmt.Sprintf("Id:%s, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) + return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", + vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) } diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go index 57e0deea9..2fb2ed4bf 100644 --- a/go/storage/volume_super_block.go +++ b/go/storage/volume_super_block.go @@ -38,7 +38,7 @@ func (s *SuperBlock) Bytes() []byte { func (v *Volume) maybeWriteSuperBlock() error { stat, e := v.dataFile.Stat() if e != nil { - glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error()) + glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile, e) return e } if stat.Size() == 0 { @@ -57,11 +57,11 @@ func (v *Volume) maybeWriteSuperBlock() error { } func (v *Volume) readSuperBlock() (err error) { if _, err = v.dataFile.Seek(0, 0); err != nil { - return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error()) + return fmt.Errorf("cannot seek to the beginning of %s: %v", v.dataFile.Name(), err) } header := make([]byte, SuperBlockSize) if _, e := v.dataFile.Read(header); e != nil { - return fmt.Errorf("cannot read superblock: %s", e.Error()) + return fmt.Errorf("cannot read volume %d super block: %v", v.Id, e) } v.SuperBlock, err = ParseSuperBlock(header) return err diff --git a/go/topology/configuration_test.go b/go/topology/configuration_test.go index 35d82c058..0a353d16e 100644 --- a/go/topology/configuration_test.go +++ b/go/topology/configuration_test.go @@ -33,7 +33,7 @@ func TestLoadConfiguration(t *testing.T) { fmt.Printf("%s\n", c) if err != nil { - t.Fatalf("unmarshal error:%s", err.Error()) + t.Fatalf("unmarshal error:%v", err) } if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { diff --git a/go/topology/topology.go b/go/topology/topology.go index c2073ed2f..4cfd070db 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -119,7 +119,7 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) { vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) if err != nil || datanodes.Length() == 0 { - return "", 0, nil, errors.New("No writable volumes avalable!") + return "", 0, nil, errors.New("No writable volumes available!") } fileId, count := t.Sequence.NextFileId(count) return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index 6124c0da2..3ad6ad757 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -204,7 +204,7 @@ func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *Volum glog.V(0).Infoln("Created Volume", vid, "on", server) } else { glog.V(0).Infoln("Failed to assign", vid, "to", servers, "error", err) - return fmt.Errorf("Failed to assign %s: %s", vid.String(), err.Error()) + return fmt.Errorf("Failed to assign %d: %v", vid, err) } } return nil diff --git a/go/util/constants.go b/go/util/constants.go index d677ba44f..1aa59d634 100644 --- a/go/util/constants.go +++ b/go/util/constants.go @@ -1,5 +1,5 @@ package util const ( - VERSION = "0.67" + VERSION = "0.68" ) diff --git a/go/weed/filer.go b/go/weed/filer.go index 5b3fd2b67..4e7191e34 100644 --- a/go/weed/filer.go +++ b/go/weed/filer.go @@ -77,7 +77,7 @@ func runFiler(cmd *Command, args []string) bool { *f.redis_server, *f.redis_database, ) if nfs_err != nil { - glog.Fatalf(nfs_err.Error()) + glog.Fatalf("Filer startup error: %v", nfs_err) } glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*f.port)) filerListener, e := util.NewListener( @@ -85,10 +85,10 @@ func runFiler(cmd *Command, args []string) bool { time.Duration(10)*time.Second, ) if e != nil { - glog.Fatalf(e.Error()) + glog.Fatalf("Filer listener error: %v", e) } if e := http.Serve(filerListener, r); e != nil { - glog.Fatalf("Filer Fail to serve:%s", e.Error()) + glog.Fatalf("Filer Fail to serve: %v", e) } return true diff --git a/go/weed/master.go b/go/weed/master.go index de4b5cb4b..13f6d7c43 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -71,7 +71,7 @@ func runMaster(cmd *Command, args []string) bool { listener, e := util.NewListener(listeningAddress, time.Duration(*mTimeout)*time.Second) if e != nil { - glog.Fatalf(e.Error()) + glog.Fatalf("Master startup error: %v", e) } go func() { @@ -93,7 +93,7 @@ func runMaster(cmd *Command, args []string) bool { }() if e := http.Serve(listener, r); e != nil { - glog.Fatalf("Fail to serve:%s", e.Error()) + glog.Fatalf("Fail to serve: %v", e) } return true } diff --git a/go/weed/server.go b/go/weed/server.go index 0b973f7e1..980c545b4 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -81,10 +81,10 @@ func init() { filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") - filerOptions.cassandra_server = cmdFiler.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") - filerOptions.cassandra_keyspace = cmdFiler.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") + filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") + filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") - filerOptions.redis_database = cmdFiler.Flag.Int("filer.redis.database", 0, "the database on the redis server") + filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server") } @@ -167,7 +167,7 @@ func runServer(cmd *Command, args []string) bool { "", 0, ) if nfs_err != nil { - glog.Fatalf(nfs_err.Error()) + glog.Fatalf("Filer startup error: %v", nfs_err) } glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*filerOptions.port)) filerListener, e := util.NewListener( @@ -175,10 +175,11 @@ func runServer(cmd *Command, args []string) bool { time.Duration(10)*time.Second, ) if e != nil { + glog.Fatalf("Filer listener error: %v", e) glog.Fatalf(e.Error()) } if e := http.Serve(filerListener, r); e != nil { - glog.Fatalf("Filer Fail to serve:%s", e.Error()) + glog.Fatalf("Filer Fail to serve: %v", e) } }() } @@ -199,7 +200,7 @@ func runServer(cmd *Command, args []string) bool { glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort)) masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), time.Duration(*serverTimeout)*time.Second) if e != nil { - glog.Fatalf(e.Error()) + glog.Fatalf("Master startup error: %v", e) } go func() { @@ -224,7 +225,7 @@ func runServer(cmd *Command, args []string) bool { volumeWait.Wait() time.Sleep(100 * time.Millisecond) r := http.NewServeMux() - volumeServer := weed_server.NewVolumeServer(r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts, + volumeServer := weed_server.NewVolumeServer(r, r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, serverWhiteList, *volumeFixJpgOrientation, ) @@ -235,7 +236,7 @@ func runServer(cmd *Command, args []string) bool { time.Duration(*serverTimeout)*time.Second, ) if e != nil { - glog.Fatalf(e.Error()) + glog.Fatalf("Volume server listener error: %v", e) } OnInterrupt(func() { @@ -244,7 +245,7 @@ func runServer(cmd *Command, args []string) bool { }) if e := http.Serve(volumeListener, r); e != nil { - glog.Fatalf("Fail to serve:%s", e.Error()) + glog.Fatalf("Volume server fail to serve:%v", e) } return true diff --git a/go/weed/volume.go b/go/weed/volume.go index 1683e1927..1dfd88576 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -13,8 +13,42 @@ import ( "github.com/chrislusf/weed-fs/go/weed/weed_server" ) +var ( + v VolumeServerOptions +) + +type VolumeServerOptions struct { + port *int + adminPort *int + folders []string + folderMaxLimits []int + ip *string + publicIp *string + bindIp *string + master *string + pulseSeconds *int + idleConnectionTimeout *int + maxCpu *int + dataCenter *string + rack *string + whiteList []string + fixJpgOrientation *bool +} + func init() { cmdVolume.Run = runVolume // break init cycle + v.port = cmdVolume.Flag.Int("port", 8080, "http listen port") + v.adminPort = cmdVolume.Flag.Int("port.admin", 8443, "https admin port, active when SSL certs are specified. Not ready yet.") + v.ip = cmdVolume.Flag.String("ip", "", "ip or server name") + v.publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible ") + v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") + v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") + v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds") + v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") + v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") + v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") } var cmdVolume = &Command{ @@ -26,76 +60,66 @@ var cmdVolume = &Command{ } var ( - vport = cmdVolume.Flag.Int("port", 8080, "http listen port") - volumeSecurePort = cmdVolume.Flag.Int("port.secure", 8443, "https listen port, active when SSL certs are specified. Not ready yet.") volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") - ip = cmdVolume.Flag.String("ip", "", "ip or server name") - publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible ") - volumeBindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") - masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") - vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") - vTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds") - vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") - rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") - - volumeWhiteList []string ) func runVolume(cmd *Command, args []string) bool { - if *vMaxCpu < 1 { - *vMaxCpu = runtime.NumCPU() + if *v.maxCpu < 1 { + *v.maxCpu = runtime.NumCPU() } - runtime.GOMAXPROCS(*vMaxCpu) - folders := strings.Split(*volumeFolders, ",") + runtime.GOMAXPROCS(*v.maxCpu) + + //Set multiple folders and each folder's max volume count limit' + v.folders = strings.Split(*volumeFolders, ",") maxCountStrings := strings.Split(*maxVolumeCounts, ",") - maxCounts := make([]int, 0) for _, maxString := range maxCountStrings { if max, e := strconv.Atoi(maxString); e == nil { - maxCounts = append(maxCounts, max) + v.folderMaxLimits = append(v.folderMaxLimits, max) } else { glog.Fatalf("The max specified in -max not a valid number %s", maxString) } } - if len(folders) != len(maxCounts) { - glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(folders), len(maxCounts)) + if len(v.folders) != len(v.folderMaxLimits) { + glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits)) } - for _, folder := range folders { + for _, folder := range v.folders { if err := util.TestFolderWritable(folder); err != nil { glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) } } - if *publicIp == "" { - if *ip == "" { - *ip = "127.0.0.1" - *publicIp = "localhost" - } else { - *publicIp = *ip - } - } + //security related white list configuration if *volumeWhiteListOption != "" { - volumeWhiteList = strings.Split(*volumeWhiteListOption, ",") + v.whiteList = strings.Split(*volumeWhiteListOption, ",") + } + + //derive default public ip address + if *v.publicIp == "" { + if *v.ip == "" { + *v.ip = "127.0.0.1" + *v.publicIp = "localhost" + } else { + *v.publicIp = *v.ip + } } r := http.NewServeMux() - volumeServer := weed_server.NewVolumeServer(r, *ip, *vport, *publicIp, folders, maxCounts, - *masterNode, *vpulse, *dataCenter, *rack, - volumeWhiteList, - *fixJpgOrientation, + volumeServer := weed_server.NewVolumeServer(r, r, *v.ip, *v.port, *v.publicIp, v.folders, v.folderMaxLimits, + *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, + v.whiteList, + *v.fixJpgOrientation, ) - listeningAddress := *volumeBindIp + ":" + strconv.Itoa(*vport) + listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress) - listener, e := util.NewListener(listeningAddress, time.Duration(*vTimeout)*time.Second) + listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) if e != nil { - glog.Fatalf(e.Error()) + glog.Fatalf("Volume server listener error:%v", e) } OnInterrupt(func() { @@ -103,7 +127,7 @@ func runVolume(cmd *Command, args []string) bool { }) if e := http.Serve(listener, r); e != nil { - glog.Fatalf("Fail to serve:%s", e.Error()) + glog.Fatalf("Volume server fail to serve: %v", e) } return true } diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index 44ffcce47..d259aa660 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -61,7 +61,7 @@ func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj inter // wrapper for writeJson - just logs errors func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) { if err := writeJson(w, r, httpStatus, obj); err != nil { - glog.V(0).Infof("error writing JSON %s: %s", obj, err.Error()) + glog.V(0).Infof("error writing JSON %s: %v", obj, err) } } func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) { diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go index 18a02b5e0..b43e1965b 100644 --- a/go/weed/weed_server/filer_server.go +++ b/go/weed/weed_server/filer_server.go @@ -45,7 +45,7 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle fs.filer = flat_namespace.NewFlatNamesapceFiler(master, redis_store) } else { if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil { - glog.Fatalf("Can not start filer in dir %s : %v", err) + glog.Fatalf("Can not start filer in dir %s : %v", dir, err) return } diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index 437044eee..4d304efcb 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -121,7 +121,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) if machines != nil && len(machines) > 0 { http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) } else { - writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found.", volumeId)) + writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d not found.", volumeId)) } } diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index 9ceeb0149..0eb9daa0e 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -22,7 +22,7 @@ type VolumeServer struct { FixJpgOrientation bool } -func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int, +func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int, masterNode string, pulseSeconds int, dataCenter string, rack string, whiteList []string, @@ -39,18 +39,18 @@ func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, fol vs.guard = security.NewGuard(whiteList, "") - r.HandleFunc("/status", vs.guard.Secure(vs.statusHandler)) - r.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler)) - r.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler)) - r.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler)) - r.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler)) - r.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler)) - r.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler)) - r.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler)) - r.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler)) - r.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler)) - r.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler)) - r.HandleFunc("/", vs.storeHandler) + adminMux.HandleFunc("/status", vs.guard.Secure(vs.statusHandler)) + adminMux.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler)) + adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler)) + adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler)) + adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler)) + adminMux.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler)) + adminMux.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler)) + adminMux.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler)) + adminMux.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler)) + adminMux.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler)) + publicMux.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler)) + publicMux.HandleFunc("/", vs.storeHandler) go func() { connected := true diff --git a/go/weed/weed_server/volume_server_handlers_helper.go b/go/weed/weed_server/volume_server_handlers_helper.go index 6d21ffb62..2bab35e45 100644 --- a/go/weed/weed_server/volume_server_handlers_helper.go +++ b/go/weed/weed_server/volume_server_handlers_helper.go @@ -93,7 +93,7 @@ func (w *countingWriter) Write(p []byte) (n int, err error) { return len(p), nil } -// rangesMIMESize returns the nunber of bytes it takes to encode the +// rangesMIMESize returns the number of bytes it takes to encode the // provided ranges as a multipart response. func rangesMIMESize(ranges []httpRange, contentType string, contentSize int64) (encSize int64) { var w countingWriter diff --git a/note/security.txt b/note/security.txt index 04030a574..c5482a569 100644 --- a/note/security.txt +++ b/note/security.txt @@ -3,7 +3,7 @@ Design for Seaweed-FS security Design Objectives Security can mean many different things. The original vision is that: if you have one machine lying around somewhere with some disk space, it should be able to join your file system to contribute some disk space and - network bandwidth. + network bandwidth, securely! To achieve this purpose, the security should be able to: 1. Secure the inter-server communication. Only real cluster servers can join and communicate. @@ -14,7 +14,7 @@ Non Objective User specific access control. Design Architect - master, and volume servers all talk securely via 2-way SSL for admin. + master, and volume servers all talk securely via 2-way SSL for admin operations. upon joining, master gives its secret key to volume servers. filer or clients talk to master to get secret key, and use the key to generate JWT to write on volume server. A side benefit: @@ -34,3 +34,18 @@ file uploading: when filer/clients wants to upload, master generate a JWT filer~>volume(public port) master~>volume(public port) + +Currently, volume server has 2 ip addresses: ip and publicUrl. + The ip is for admin purpose, and master talk to volume server this way. + The publicUrl is for clients to access the server, via http GET/POST/DELETE etc. + The write operations are secured by JWT. + clients talk to master also via https? possible. Decide on this later. + +Dev plan: + 1. volume server separate admin from public GET/POST/DELETE handlers + The step 1 may be good enough for most use cases. + + If 2-way ssl are still needed + 2. volume server add ssl support + 3. https connections to operate on volume servers +