{{ range $entry := .Breadcrumbs }}
-
+
{{ $entry.Name }}
{{ end }}
@@ -78,20 +78,19 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`
{{end}}
-
+ |
{{if $entry.IsDirectory}}
{{else}}
- {{ $entry.Mime }}
+ {{ $entry.Mime }}
{{end}}
|
-
+ |
{{if $entry.IsDirectory}}
{{else}}
- {{ $entry.Size | humanizeBytes }}
-
+ {{ $entry.Size | humanizeBytes }}
{{end}}
|
-
+ |
{{ $entry.Timestamp.Format "2006-01-02 15:04" }}
|
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 82a190e39..fcfd98f7b 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
"google.golang.org/grpc/peer"
@@ -76,7 +77,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
int64(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
+ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
+ MetricsAddress: ms.option.MetricsAddress,
+ MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ StorageBackends: backend.ToPbStorageBackends(),
}); err != nil {
glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
@@ -164,9 +168,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
return err
}
if err := stream.Send(&master_pb.HeartbeatResponse{
- Leader: newLeader,
- MetricsAddress: ms.option.MetricsAddress,
- MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ Leader: newLeader,
}); err != nil {
glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err)
return err
diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go
index f8e0785f6..f02b0f242 100644
--- a/weed/server/master_grpc_server_collection.go
+++ b/weed/server/master_grpc_server_collection.go
@@ -57,8 +57,8 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error {
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
+ err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
return deleteErr
@@ -77,8 +77,8 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error {
listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName)
for _, server := range listOfEcServers {
- err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
+ err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
return deleteErr
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 0580acf76..856c07890 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -5,10 +5,11 @@ import (
"fmt"
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -52,7 +53,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if req.Replication == "" {
req.Replication = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
@@ -78,7 +79,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, int(req.WritableVolumeCount)); err != nil {
ms.vgLock.Unlock()
return nil, fmt.Errorf("Cannot grow volume group! %v", err)
}
@@ -108,7 +109,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
if req.Replication == "" {
req.Replication = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index cde583560..b3cc310e6 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -14,6 +14,9 @@ import (
"time"
"github.com/chrislusf/raft"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -22,9 +25,11 @@ import (
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/gorilla/mux"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
+)
+
+const (
+ SequencerType = "master.sequencer.type"
+ SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls"
)
type MasterOption struct {
@@ -64,7 +69,7 @@ type MasterServer struct {
func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
- v := viper.GetViper()
+ v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
v.SetDefault("jwt.signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
@@ -78,7 +83,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
- grpcDialOption := security.LoadClientTLS(v.Sub("grpc"), "master")
+ grpcDialOption := security.LoadClientTLS(v, "grpc.master")
ms := &MasterServer{
option: option,
preallocateSize: preallocateSize,
@@ -87,7 +92,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
}
ms.bounedLeaderChan = make(chan int, 16)
- seq := sequence.NewMemorySequencer()
+
+ seq := ms.createSequencer(option)
+ if nil == seq {
+ glog.Fatalf("create sequencer failed.")
+ }
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
@@ -165,33 +174,41 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
} else {
- //drop it to the floor
- //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
+ // drop it to the floor
+ // writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
}
}
}
func (ms *MasterServer) startAdminScripts() {
- v := viper.GetViper()
- adminScripts := v.GetString("master.maintenance.scripts")
- v.SetDefault("master.maintenance.sleep_minutes", 17)
- sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
+ var err error
+ v := util.GetViper()
+ adminScripts := v.GetString("master.maintenance.scripts")
glog.V(0).Infof("adminScripts:\n%v", adminScripts)
if adminScripts == "" {
return
}
+ v.SetDefault("master.maintenance.sleep_minutes", 17)
+ sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
+
+ v.SetDefault("master.filer.default_filer_url", "http://localhost:8888/")
+ filerURL := v.GetString("master.filer.default_filer_url")
+
scriptLines := strings.Split(adminScripts, "\n")
masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
var shellOptions shell.ShellOptions
- shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")
+ shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
shellOptions.Masters = &masterAddress
- shellOptions.FilerHost = "localhost"
- shellOptions.FilerPort = 8888
- shellOptions.Directory = "/"
+
+ shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, err = util.ParseFilerUrl(filerURL)
+ if err != nil {
+ glog.V(0).Infof("failed to parse master.filer.default_filer_urll=%s : %v\n", filerURL, err)
+ return
+ }
commandEnv := shell.NewCommandEnv(shellOptions)
@@ -230,3 +247,24 @@ func (ms *MasterServer) startAdminScripts() {
}
}()
}
+
+func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
+ var seq sequence.Sequencer
+ v := util.GetViper()
+ seqType := strings.ToLower(v.GetString(SequencerType))
+ glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
+ switch strings.ToLower(seqType) {
+ case "etcd":
+ var err error
+ urls := v.GetString(SequencerEtcdUrls)
+ glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls)
+ seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder)
+ if err != nil {
+ glog.Error(err)
+ seq = nil
+ }
+ default:
+ seq = sequence.NewMemorySequencer()
+ }
+ return seq
+}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index c10f9a5b7..514d86800 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -65,11 +65,17 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo
var err error
if ms.Topo.IsLeader() {
volumeId, newVolumeIdErr := needle.NewVolumeId(vid)
- machines := ms.Topo.Lookup(collection, volumeId)
- for _, loc := range machines {
- locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
+ if newVolumeIdErr != nil {
+ err = fmt.Errorf("Unknown volume id %s", vid)
+ } else {
+ machines := ms.Topo.Lookup(collection, volumeId)
+ for _, loc := range machines {
+ locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
+ }
+ if locations == nil {
+ err = fmt.Errorf("volume id %s not found", vid)
+ }
}
- err = newVolumeIdErr
} else {
machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
for _, loc := range machines {
@@ -94,6 +100,11 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
requestedCount = 1
}
+ writableVolumeCount, e := strconv.Atoi(r.FormValue("writableVolumeCount"))
+ if e != nil {
+ writableVolumeCount = 0
+ }
+
option, err := ms.getVolumeGrowOption(r)
if err != nil {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
@@ -108,7 +119,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, writableVolumeCount); err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Cannot grow volume group! %v", err))
return
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 486bf31f4..44a04cb86 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -10,22 +10,23 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
- collection, ok := ms.Topo.FindCollection(r.FormValue("collection"))
+ collectionName := r.FormValue("collection")
+ collection, ok := ms.Topo.FindCollection(collectionName)
if !ok {
- writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection")))
+ writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName))
return
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
+ err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{
Collection: collection.Name,
})
return deleteErr
@@ -35,7 +36,10 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
}
- ms.Topo.DeleteCollection(r.FormValue("collection"))
+ ms.Topo.DeleteCollection(collectionName)
+
+ w.WriteHeader(http.StatusNoContent)
+ return
}
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
@@ -53,6 +57,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
gcThreshold, err = strconv.ParseFloat(gcString, 32)
if err != nil {
glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err)
+ writeJsonError(w, r, http.StatusNotAcceptable, fmt.Errorf("garbageThreshold %s is not a valid float number", gcString))
return
}
}
@@ -140,7 +145,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if replicationString == "" {
replicationString = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(replicationString)
if err != nil {
return nil, err
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index c631d2535..43987b748 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -2,10 +2,12 @@ package weed_server
import (
"context"
+ "fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
@@ -96,6 +98,41 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
}
+func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
+
+ resp := &volume_server_pb.VolumeConfigureResponse{}
+
+ // check replication format
+ if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
+ resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
+ return resp, nil
+ }
+
+ // unmount
+ if err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)); err != nil {
+ glog.Errorf("volume configure unmount %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure unmount %v: %v", req, err)
+ return resp, nil
+ }
+
+ // modify the volume info file
+ if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
+ glog.Errorf("volume configure %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
+ return resp, nil
+ }
+
+ // mount
+ if err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); err != nil {
+ glog.Errorf("volume configure mount %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure mount %v: %v", req, err)
+ return resp, nil
+ }
+
+ return resp, nil
+
+}
+
func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 731675b48..dc47c2884 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -5,15 +5,17 @@ import (
"net"
"time"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
- "github.com/spf13/viper"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+
+ "golang.org/x/net/context"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "golang.org/x/net/context"
)
func (vs *VolumeServer) GetMaster() string {
@@ -25,7 +27,7 @@ func (vs *VolumeServer) heartbeat() {
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
- grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "volume")
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.volume")
var err error
var newLeader string
@@ -90,6 +92,9 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
vs.MetricsAddress = in.GetMetricsAddress()
vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds())
}
+ if len(in.StorageBackends) > 0 {
+ backend.LoadFromPbStorageBackends(in.StorageBackends)
+ }
}
}()
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 711a3ebad..6d74f8171 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -20,7 +20,7 @@ import (
const BufferSizeLimit = 1024 * 1024 * 2
-// VolumeCopy copy the .idx .dat files, and mount the volume
+// VolumeCopy copy the .idx .dat .vif files, and mount the volume
func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
@@ -41,7 +41,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// confirm size and timestamp
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
var volumeFileName, idxFileName, datFileName string
- err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
var err error
volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
&volume_server_pb.ReadVolumeFileStatusRequest{
@@ -55,11 +55,15 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// println("source:", volFileInfoResp.String())
// copy ecx file
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil {
+ if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
return err
}
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil {
+ if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
+ return err
+ }
+
+ if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".vif", false, true); err != nil {
return err
}
@@ -70,12 +74,9 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
datFileName = volumeFileName + ".dat"
if err != nil && volumeFileName != "" {
- if idxFileName != "" {
- os.Remove(idxFileName)
- }
- if datFileName != "" {
- os.Remove(datFileName)
- }
+ os.Remove(idxFileName)
+ os.Remove(datFileName)
+ os.Remove(volumeFileName + ".vif")
return nil, err
}
@@ -95,15 +96,16 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}
func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
- compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error {
+ compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool, ignoreSourceFileNotFound bool) error {
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
- VolumeId: vid,
- Ext: ext,
- CompactionRevision: compactRevision,
- StopOffset: stopOffset,
- Collection: collection,
- IsEcVolume: isEcVolume,
+ VolumeId: vid,
+ Ext: ext,
+ CompactionRevision: compactRevision,
+ StopOffset: stopOffset,
+ Collection: collection,
+ IsEcVolume: isEcVolume,
+ IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
@@ -213,6 +215,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
}
}
if fileName == "" {
+ if req.IgnoreSourceFileNotFound {
+ return nil
+ }
return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
}
}
@@ -221,6 +226,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
file, err := os.Open(fileName)
if err != nil {
+ if req.IgnoreSourceFileNotFound && err == os.ErrNotExist {
+ return nil
+ }
return err
}
defer file.Close()
diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go
index f56fbeef4..6d6c3daa3 100644
--- a/weed/server/volume_grpc_copy_incremental.go
+++ b/weed/server/volume_grpc_copy_incremental.go
@@ -4,9 +4,9 @@ import (
"context"
"fmt"
"io"
- "os"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
@@ -30,7 +30,7 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem
startOffset := foundOffset.ToAcutalOffset()
buf := make([]byte, 1024*1024*2)
- return sendFileContent(v.DataFile(), buf, startOffset, int64(stopOffset), stream)
+ return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream)
}
@@ -47,10 +47,10 @@ func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server
}
-func sendFileContent(datFile *os.File, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
+func sendFileContent(datBackend backend.BackendStorageFile, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
var blockSizeLimit = int64(len(buf))
for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit {
- n, readErr := datFile.ReadAt(buf, startOffset+i)
+ n, readErr := datBackend.ReadAt(buf, startOffset+i)
if readErr == nil || readErr == io.EOF {
resp := &volume_server_pb.VolumeIncrementalCopyResponse{}
resp.FileContent = buf[:int64(n)]
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 8140a06f6..256e7c447 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -8,10 +8,12 @@ import (
"math"
"os"
"path"
+ "path/filepath"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
@@ -24,7 +26,7 @@ import (
Steps to apply erasure coding to .dat .idx files
0. ensure the volume is readonly
-1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files
+1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
2. client ask master for possible servers to hold the ec files, at least 4 servers
3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
4. target servers report the new ec files to the master
@@ -33,7 +35,7 @@ Steps to apply erasure coding to .dat .idx files
*/
-// VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files
+// VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
@@ -47,19 +49,24 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
}
// write .ecx file
- if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
- return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
+ return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
}
- // write .ec01 ~ .ec14 files
+ // write .ec00 ~ .ec13 files
if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
}
+ // write .vif files
+ if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
}
-// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files
+// VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
@@ -68,7 +75,7 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
for _, location := range vs.store.Locations {
if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
- // write .ec01 ~ .ec14 files
+ // write .ec00 ~ .ec13 files
baseFileName = path.Join(location.Directory, baseFileName)
if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
@@ -99,27 +106,36 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
- err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil {
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
return err
}
}
- if !req.CopyEcxFile {
+ if req.CopyEcxFile {
+
+ // copy ecx file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
+ return err
+ }
return nil
}
- // copy ecx file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil {
- return err
+ if req.CopyEcjFile {
+ // copy ecj file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
+ return err
+ }
}
- // copy ecj file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil {
- return err
+ if req.CopyVifFile {
+ // copy vif file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
+ return err
+ }
}
return nil
@@ -137,6 +153,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+ glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
+
found := false
for _, location := range vs.store.Locations {
if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
@@ -153,21 +171,27 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
return nil, nil
}
- // check whether to delete the ecx file also
+ // check whether to delete the .ecx and .ecj file also
hasEcxFile := false
+ hasIdxFile := false
existingShardCount := 0
+ bName := filepath.Base(baseFilename)
for _, location := range vs.store.Locations {
fileInfos, err := ioutil.ReadDir(location.Directory)
if err != nil {
continue
}
for _, fileInfo := range fileInfos {
- if fileInfo.Name() == baseFilename+".ecx" {
+ if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
hasEcxFile = true
continue
}
- if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") {
+ if fileInfo.Name() == bName+".idx" {
+ hasIdxFile = true
+ continue
+ }
+ if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
existingShardCount++
}
}
@@ -181,6 +205,10 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
return nil, err
}
}
+ if !hasIdxFile {
+ // .vif is used for ec volumes and normal volumes
+ os.Remove(baseFilename + ".vif")
+ }
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
}
@@ -252,9 +280,14 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
startOffset, bytesToRead := req.Offset, req.Size
for bytesToRead > 0 {
- bytesread, err := ecShard.ReadAt(buffer, startOffset)
+ // min of bytesToRead and bufSize
+ bufferSize := bufSize
+ if bufferSize > bytesToRead {
+ bufferSize = bytesToRead
+ }
+ bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
- // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
+ // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
if bytesread > 0 {
if int64(bytesread) > bytesToRead {
@@ -268,6 +301,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
return err
}
+ startOffset += int64(bytesread)
bytesToRead -= int64(bytesread)
}
@@ -311,3 +345,35 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
return resp, nil
}
+
+// VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
+func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
+
+ v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
+ if !found {
+ return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
+ }
+ baseFileName := v.FileName()
+
+ if v.Collection != req.Collection {
+ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // calculate .dat file size
+ datFileSize, err := erasure_coding.FindDatFileSize(baseFileName)
+ if err != nil {
+ return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err)
+ }
+
+ // write .dat file from .ec00 ~ .ec09 files
+ if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
+ // write .idx file from .ecx and .ecj files
+ if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil {
+ return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err)
+ }
+
+ return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
+}
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index cb0d320ad..c26d6ed8f 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
@@ -71,7 +72,7 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
stream: stream,
}
- err = storage.ScanVolumeFileFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), scanner)
+ err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToAcutalOffset(), scanner)
return scanner.lastProcessedTimestampNs, err
@@ -101,7 +102,7 @@ type VolumeFileScanner4Tailing struct {
lastProcessedTimestampNs uint64
}
-func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock storage.SuperBlock) error {
+func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error {
return nil
}
diff --git a/weed/server/volume_grpc_tier_download.go b/weed/server/volume_grpc_tier_download.go
new file mode 100644
index 000000000..7b3982e40
--- /dev/null
+++ b/weed/server/volume_grpc_tier_download.go
@@ -0,0 +1,85 @@
+package weed_server
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+// VolumeTierMoveDatFromRemote copy dat file from a remote tier to local volume server
+func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.VolumeTierMoveDatFromRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatFromRemoteServer) error {
+
+ // find existing volume
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+
+ // verify the collection
+ if v.Collection != req.Collection {
+ return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // locate the disk file
+ storageName, storageKey := v.RemoteStorageNameKey()
+ if storageName == "" || storageKey == "" {
+ return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
+ }
+
+ // check whether the local .dat already exists
+ _, ok := v.DataBackend.(*backend.DiskFile)
+ if ok {
+ return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
+ }
+
+ // check valid storage backend type
+ backendStorage, found := backend.BackendStorages[storageName]
+ if !found {
+ var keys []string
+ for key := range backend.BackendStorages {
+ keys = append(keys, key)
+ }
+ return fmt.Errorf("remote storage %s not found from suppported: %v", storageName, keys)
+ }
+
+ startTime := time.Now()
+ fn := func(progressed int64, percentage float32) error {
+ now := time.Now()
+ if now.Sub(startTime) < time.Second {
+ return nil
+ }
+ startTime = now
+ return stream.Send(&volume_server_pb.VolumeTierMoveDatFromRemoteResponse{
+ Processed: progressed,
+ ProcessedPercentage: percentage,
+ })
+ }
+ // copy the data file
+ _, err := backendStorage.DownloadFile(v.FileName()+".dat", storageKey, fn)
+ if err != nil {
+ return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName()+".dat", err)
+ }
+
+ if req.KeepRemoteDatFile {
+ return nil
+ }
+
+ // remove remote file
+ if err := backendStorage.DeleteFile(storageKey); err != nil {
+ return fmt.Errorf("volume %d fail to delete remote file %s: %v", v.Id, storageKey, err)
+ }
+
+ // forget remote file
+ v.GetVolumeInfo().Files = v.GetVolumeInfo().Files[1:]
+ if err := v.SaveVolumeInfo(); err != nil {
+ return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
+ }
+
+ v.DataBackend.Close()
+ v.DataBackend = nil
+
+ return nil
+}
diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go
new file mode 100644
index 000000000..c9694df59
--- /dev/null
+++ b/weed/server/volume_grpc_tier_upload.go
@@ -0,0 +1,100 @@
+package weed_server
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+// VolumeTierMoveDatToRemote copy dat file to a remote tier
+func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTierMoveDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatToRemoteServer) error {
+
+ // find existing volume
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+
+ // verify the collection
+ if v.Collection != req.Collection {
+ return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // locate the disk file
+ diskFile, ok := v.DataBackend.(*backend.DiskFile)
+ if !ok {
+ return fmt.Errorf("volume %d is not on local disk", req.VolumeId)
+ }
+
+ // check valid storage backend type
+ backendStorage, found := backend.BackendStorages[req.DestinationBackendName]
+ if !found {
+ var keys []string
+ for key := range backend.BackendStorages {
+ keys = append(keys, key)
+ }
+ return fmt.Errorf("destination %s not found, suppported: %v", req.DestinationBackendName, keys)
+ }
+
+ // check whether the existing backend storage is the same as requested
+ // if same, skip
+ backendType, backendId := backend.BackendNameToTypeId(req.DestinationBackendName)
+ for _, remoteFile := range v.GetVolumeInfo().GetFiles() {
+ if remoteFile.BackendType == backendType && remoteFile.BackendId == backendId {
+ return fmt.Errorf("destination %s already exists", req.DestinationBackendName)
+ }
+ }
+
+ startTime := time.Now()
+ fn := func(progressed int64, percentage float32) error {
+ now := time.Now()
+ if now.Sub(startTime) < time.Second {
+ return nil
+ }
+ startTime = now
+ return stream.Send(&volume_server_pb.VolumeTierMoveDatToRemoteResponse{
+ Processed: progressed,
+ ProcessedPercentage: percentage,
+ })
+ }
+
+ // remember the file original source
+ attributes := make(map[string]string)
+ attributes["volumeId"] = v.Id.String()
+ attributes["collection"] = v.Collection
+ attributes["ext"] = ".dat"
+ // copy the data file
+ key, size, err := backendStorage.CopyFile(diskFile.File, attributes, fn)
+ if err != nil {
+ return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.Name(), err)
+ }
+
+ // save the remote file to volume tier info
+ v.GetVolumeInfo().Files = append(v.GetVolumeInfo().GetFiles(), &volume_server_pb.RemoteFile{
+ BackendType: backendType,
+ BackendId: backendId,
+ Key: key,
+ Offset: 0,
+ FileSize: uint64(size),
+ ModifiedTime: uint64(time.Now().Unix()),
+ Extension: ".dat",
+ })
+
+ if err := v.SaveVolumeInfo(); err != nil {
+ return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
+ }
+
+ if err := v.LoadRemoteFile(); err != nil {
+ return fmt.Errorf("volume %d fail to load remote file: %v", v.Id, err)
+ }
+
+ if !req.KeepLocalDatFile {
+ os.Remove(v.FileName() + ".dat")
+ }
+
+ return nil
+}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 6cf654738..0fdcf662a 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -4,13 +4,14 @@ import (
"fmt"
"net/http"
- "github.com/chrislusf/seaweedfs/weed/stats"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
- "github.com/spf13/viper"
)
type VolumeServer struct {
@@ -29,6 +30,7 @@ type VolumeServer struct {
compactionBytePerSecond int64
MetricsAddress string
MetricsIntervalSec int
+ fileSizeLimitBytes int64
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -41,9 +43,10 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fixJpgOrientation bool,
readRedirect bool,
compactionMBPerSecond int,
+ fileSizeLimitMB int,
) *VolumeServer {
- v := viper.GetViper()
+ v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
v.SetDefault("jwt.signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
@@ -60,8 +63,9 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect,
- grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
+ fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
}
vs.SeedMasterNodes = masterNodes
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go
index 25b6582f7..1938a34c4 100644
--- a/weed/server/volume_server_handlers_admin.go
+++ b/weed/server/volume_server_handlers_admin.go
@@ -12,7 +12,7 @@ import (
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION
- m["Volumes"] = vs.store.Status()
+ m["Volumes"] = vs.store.VolumeInfos()
writeJsonQuiet(w, r, http.StatusOK, m)
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index cd11356b9..d89d13a0d 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -54,7 +54,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
- glog.V(4).Infoln("volume", volumeId, "reading", n)
+ // glog.V(4).Infoln("volume", volumeId, "reading", n)
hasVolume := vs.store.HasVolume(volumeId)
_, hasEcVolume := vs.store.FindEcVolume(volumeId)
if !hasVolume && !hasEcVolume {
@@ -88,7 +88,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
} else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
}
- glog.V(4).Infoln("read bytes", count, "error", err)
+ // glog.V(4).Infoln("read bytes", count, "error", err)
if err != nil || count < 0 {
glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
w.WriteHeader(http.StatusNotFound)
diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go
index 852f0b751..8d35c9c8b 100644
--- a/weed/server/volume_server_handlers_ui.go
+++ b/weed/server/volume_server_handlers_ui.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
ui "github.com/chrislusf/seaweedfs/weed/server/volume_server_ui"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -20,19 +21,30 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
ds = append(ds, stats.NewDiskStatus(dir))
}
}
+ volumeInfos := vs.store.VolumeInfos()
+ var normalVolumeInfos, remoteVolumeInfos []*storage.VolumeInfo
+ for _, vinfo := range volumeInfos {
+ if vinfo.IsRemote() {
+ remoteVolumeInfos = append(remoteVolumeInfos, vinfo)
+ } else {
+ normalVolumeInfos = append(normalVolumeInfos, vinfo)
+ }
+ }
args := struct {
- Version string
- Masters []string
- Volumes interface{}
- EcVolumes interface{}
- DiskStatuses interface{}
- Stats interface{}
- Counters *stats.ServerStats
+ Version string
+ Masters []string
+ Volumes interface{}
+ EcVolumes interface{}
+ RemoteVolumes interface{}
+ DiskStatuses interface{}
+ Stats interface{}
+ Counters *stats.ServerStats
}{
util.VERSION,
vs.SeedMasterNodes,
- vs.store.Status(),
+ normalVolumeInfos,
vs.store.EcVolumes(),
+ remoteVolumeInfos,
ds,
infos,
serverStats,
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index db8fcb555..cd35255e5 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -43,7 +43,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation)
+ needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne)
return
@@ -51,10 +51,14 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ret := operation.UploadResult{}
_, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
- httpStatus := http.StatusCreated
- if isUnchanged {
- httpStatus = http.StatusNotModified
+
+ // http 304 status code does not allow body
+ if writeError == nil && isUnchanged {
+ w.WriteHeader(http.StatusNotModified)
+ return
}
+
+ httpStatus := http.StatusCreated
if writeError != nil {
httpStatus = http.StatusInternalServerError
ret.Error = writeError.Error()
diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go
index eafc0aaeb..81496b1de 100644
--- a/weed/server/volume_server_ui/templates.go
+++ b/weed/server/volume_server_ui/templates.go
@@ -107,10 +107,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`
Id |
Collection |
-
Size |
+
Data Size |
Files |
Trash |
TTL |
+
ReadOnly |
@@ -122,6 +123,37 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`{{ .FileCount }}
{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes |
{{ .Ttl }} |
+ {{ .ReadOnly }} |
+
+ {{ end }}
+
+
+
+
+