mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-11-25 19:49:10 +08:00
7edbee6f57
the panic is triggered by uploading a file to a volume server not holding the designated replica. 2020-03-15 10:20:14.365488 I | http: panic serving 127.0.0.1:57124: runtime error: invalid memory address or nil pointer dereference goroutine 119 [running]: net/http.(*conn).serve.func1(0xc0001a8000) /home/travis/.gimme/versions/go1.14.linux.amd64/src/net/http/server.go:1772 +0x139 panic(0x2316fe0, 0x3662900) /home/travis/.gimme/versions/go1.14.linux.amd64/src/runtime/panic.go:973 +0x396 github.com/chrislusf/seaweedfs/weed/topology.getWritableRemoteReplications(0xc00009c000, 0x2, 0x7ffeefbffbd2, 0xe, 0x0, 0xa, 0x0, 0x0, 0xbb4bf1f7) /home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/topology/store_replicate.go:157 +0x53 github.com/chrislusf/seaweedfs/weed/topology.ReplicatedWrite(0x7ffeefbffbd2, 0xe, 0xc00009c000, 0xc000000002, 0xc000472750, 0xc0001b2200, 0x0, 0x1, 0x0) /home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/topology/store_replicate.go:29 +0xc7 github.com/chrislusf/seaweedfs/weed/server.(*VolumeServer).PostHandler(0xc0001513f0, 0x292bde0, 0xc0001fe2a0, 0xc0001b2200) /home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/server/volume_server_handlers_write.go:52 +0x56f github.com/chrislusf/seaweedfs/weed/server.(*VolumeServer).privateStoreHandler(0xc0001513f0, 0x292bde0, 0xc0001fe2a0, 0xc0001b2200) /home/travis/gopath/src/github.com/chrislusf/seaweedfs/weed/server/volume_server_handlers.go:37 +0x21f net/http.HandlerFunc.ServeHTTP(0xc0004420e0, 0x292bde0, 0xc0001fe2a0, 0xc0001b2200) /home/travis/.gimme/versions/go1.14.linux.amd64/src/net/http/server.go:2012 +0x44 net/http.(*ServeMux).ServeHTTP(0xc0001fc800, 0x292bde0, 0xc0001fe2a0, 0xc0001b2200) /home/travis/.gimme/versions/go1.14.linux.amd64/src/net/http/server.go:2387 +0x1a5 net/http.serverHandler.ServeHTTP(0xc0001781c0, 0x292bde0, 0xc0001fe2a0, 0xc0001b2200) /home/travis/.gimme/versions/go1.14.linux.amd64/src/net/http/server.go:2807 +0xa3 net/http.(*conn).serve(0xc0001a8000, 0x2934420, 0xc000212400) /home/travis/.gimme/versions/go1.14.linux.amd64/src/net/http/server.go:1895 +0x86c created by net/http.(*Server).Serve /home/travis/.gimme/versions/go1.14.linux.amd64/src/net/http/server.go:2933 +0x35c Eg: server A (datacenter 1) and server B (datacenter 2) hold replica (100) for volume 1. If you upload a file with a key 1,xxxxx to server C (datacenter 3) will trigger the panic on server C. The server C should either proxy upload file to the correct volume server or should return an HTTP error code and not panic.
188 lines
4.8 KiB
Go
188 lines
4.8 KiB
Go
package topology
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
|
"github.com/chrislusf/seaweedfs/weed/security"
|
|
"github.com/chrislusf/seaweedfs/weed/storage"
|
|
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
|
"github.com/chrislusf/seaweedfs/weed/util"
|
|
)
|
|
|
|
func ReplicatedWrite(masterNode string, s *storage.Store,
|
|
volumeId needle.VolumeId, n *needle.Needle,
|
|
r *http.Request) (size uint32, isUnchanged bool, err error) {
|
|
|
|
//check JWT
|
|
jwt := security.GetJwt(r)
|
|
|
|
var remoteLocations []operation.Location
|
|
if r.FormValue("type") != "replicate" {
|
|
remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode)
|
|
if err != nil {
|
|
glog.V(0).Infoln(err)
|
|
return
|
|
}
|
|
}
|
|
|
|
size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to write to local disk: %v", err)
|
|
glog.V(0).Infoln(err)
|
|
return
|
|
}
|
|
|
|
if len(remoteLocations) > 0 { //send to other replica locations
|
|
if err = distributedOperation(remoteLocations, s, func(location operation.Location) error {
|
|
u := url.URL{
|
|
Scheme: "http",
|
|
Host: location.Url,
|
|
Path: r.URL.Path,
|
|
}
|
|
q := url.Values{
|
|
"type": {"replicate"},
|
|
"ttl": {n.Ttl.String()},
|
|
}
|
|
if n.LastModified > 0 {
|
|
q.Set("ts", strconv.FormatUint(n.LastModified, 10))
|
|
}
|
|
if n.IsChunkedManifest() {
|
|
q.Set("cm", "true")
|
|
}
|
|
u.RawQuery = q.Encode()
|
|
|
|
pairMap := make(map[string]string)
|
|
if n.HasPairs() {
|
|
tmpMap := make(map[string]string)
|
|
err := json.Unmarshal(n.Pairs, &tmpMap)
|
|
if err != nil {
|
|
glog.V(0).Infoln("Unmarshal pairs error:", err)
|
|
}
|
|
for k, v := range tmpMap {
|
|
pairMap[needle.PairNamePrefix+k] = v
|
|
}
|
|
}
|
|
|
|
// volume server do not know about encryption
|
|
_, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsGzipped(), string(n.Mime), pairMap, jwt)
|
|
return err
|
|
}); err != nil {
|
|
size = 0
|
|
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
|
|
glog.V(0).Infoln(err)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func ReplicatedDelete(masterNode string, store *storage.Store,
|
|
volumeId needle.VolumeId, n *needle.Needle,
|
|
r *http.Request) (size uint32, err error) {
|
|
|
|
//check JWT
|
|
jwt := security.GetJwt(r)
|
|
|
|
var remoteLocations []operation.Location
|
|
if r.FormValue("type") != "replicate" {
|
|
remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode)
|
|
if err != nil {
|
|
glog.V(0).Infoln(err)
|
|
return
|
|
}
|
|
}
|
|
|
|
size, err = store.DeleteVolumeNeedle(volumeId, n)
|
|
if err != nil {
|
|
glog.V(0).Infoln("delete error:", err)
|
|
return
|
|
}
|
|
|
|
if len(remoteLocations) > 0 { //send to other replica locations
|
|
if err = distributedOperation(remoteLocations, store, func(location operation.Location) error {
|
|
return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt))
|
|
}); err != nil {
|
|
size = 0
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
type DistributedOperationResult map[string]error
|
|
|
|
func (dr DistributedOperationResult) Error() error {
|
|
var errs []string
|
|
for k, v := range dr {
|
|
if v != nil {
|
|
errs = append(errs, fmt.Sprintf("[%s]: %v", k, v))
|
|
}
|
|
}
|
|
if len(errs) == 0 {
|
|
return nil
|
|
}
|
|
return errors.New(strings.Join(errs, "\n"))
|
|
}
|
|
|
|
type RemoteResult struct {
|
|
Host string
|
|
Error error
|
|
}
|
|
|
|
func distributedOperation(locations []operation.Location, store *storage.Store, op func(location operation.Location) error) error {
|
|
length := len(locations)
|
|
results := make(chan RemoteResult)
|
|
for _, location := range locations {
|
|
go func(location operation.Location, results chan RemoteResult) {
|
|
results <- RemoteResult{location.Url, op(location)}
|
|
}(location, results)
|
|
}
|
|
ret := DistributedOperationResult(make(map[string]error))
|
|
for i := 0; i < length; i++ {
|
|
result := <-results
|
|
ret[result.Host] = result.Error
|
|
}
|
|
|
|
return ret.Error()
|
|
}
|
|
|
|
func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterNode string) (
|
|
remoteLocations []operation.Location, err error) {
|
|
|
|
v := s.GetVolume(volumeId)
|
|
if v != nil && v.ReplicaPlacement.GetCopyCount() == 1 {
|
|
return
|
|
}
|
|
|
|
// not on local store, or has replications
|
|
lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String())
|
|
if lookupErr == nil {
|
|
selfUrl := s.Ip + ":" + strconv.Itoa(s.Port)
|
|
for _, location := range lookupResult.Locations {
|
|
if location.Url != selfUrl {
|
|
remoteLocations = append(remoteLocations, location)
|
|
}
|
|
}
|
|
} else {
|
|
err = fmt.Errorf("failed to lookup for %d: %v", volumeId, lookupErr)
|
|
return
|
|
}
|
|
|
|
if v != nil {
|
|
// has one local and has remote replications
|
|
copyCount := v.ReplicaPlacement.GetCopyCount()
|
|
if len(lookupResult.Locations) < copyCount {
|
|
err = fmt.Errorf("replicating opetations [%d] is less than volume %d replication copy count [%d]",
|
|
len(lookupResult.Locations), volumeId, copyCount)
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|