mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-06-07 09:25:23 +08:00
added passing the context from the filer http handlers
This commit is contained in:
parent
a71dcd63f5
commit
4391ca1ebc
@ -1,13 +1,13 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"log"
|
||||
"math/rand"
|
||||
"time"
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
@ -56,7 +56,7 @@ func main() {
|
||||
}
|
||||
|
||||
func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, string) {
|
||||
assignResult, err := operation.Assign(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{
|
||||
assignResult, err := operation.Assign(context.Background(), func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{
|
||||
Count: 1,
|
||||
Replication: *replication,
|
||||
})
|
||||
@ -84,7 +84,7 @@ func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, st
|
||||
log.Fatalf("upload: %v", err)
|
||||
}
|
||||
|
||||
_, err = uploader.UploadData(data, uploadOption)
|
||||
_, err = uploader.UploadData(context.Background(), data, uploadOption)
|
||||
if err != nil {
|
||||
log.Fatalf("upload: %v", err)
|
||||
}
|
||||
|
@ -241,7 +241,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
|
||||
Replication: *b.replication,
|
||||
DiskType: *b.diskType,
|
||||
}
|
||||
if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
|
||||
if assignResult, err := operation.Assign(context.Background(), b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
|
||||
fp.Server, fp.Fid, fp.Pref.Collection = assignResult.Url, assignResult.Fid, *b.collection
|
||||
if !isSecure && assignResult.Auth != "" {
|
||||
isSecure = true
|
||||
|
@ -58,7 +58,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
|
||||
WritableVolumeCount: rule.VolumeGrowthCount,
|
||||
}
|
||||
|
||||
assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest)
|
||||
assignResult, err := operation.Assign(context.Background(), f.GetMaster, f.GrpcDialOption, assignRequest)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("AssignVolume: %v", err)
|
||||
}
|
||||
@ -83,7 +83,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
|
||||
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
|
||||
}
|
||||
|
||||
uploadResult, err := uploader.UploadData(data, uploadOption)
|
||||
uploadResult, err := uploader.UploadData(context.Background(), data, uploadOption)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
|
||||
}
|
||||
|
@ -139,7 +139,7 @@ func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, pri
|
||||
return
|
||||
}
|
||||
|
||||
func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
|
||||
func Assign(ctx context.Context, masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
|
||||
|
||||
var requests []*VolumeAssignRequest
|
||||
requests = append(requests, primaryRequest)
|
||||
@ -153,7 +153,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
|
||||
continue
|
||||
}
|
||||
|
||||
lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
||||
lastError = WithMasterServerClient(false, masterFn(ctx), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
|
||||
req := &master_pb.AssignRequest{
|
||||
Count: request.Count,
|
||||
Replication: request.Replication,
|
||||
@ -165,7 +165,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
|
||||
DataNode: request.DataNode,
|
||||
WritableVolumeCount: request.WritableVolumeCount,
|
||||
}
|
||||
resp, grpcErr := masterClient.Assign(context.Background(), req)
|
||||
resp, grpcErr := masterClient.Assign(ctx, req)
|
||||
if grpcErr != nil {
|
||||
return grpcErr
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ func BenchmarkStreamAssign(b *testing.B) {
|
||||
|
||||
func BenchmarkUnaryAssign(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
Assign(func(_ context.Context) pb.ServerAddress {
|
||||
Assign(context.Background(), func(_ context.Context) pb.ServerAddress {
|
||||
return pb.ServerAddress("localhost:9333")
|
||||
}, grpc.WithInsecure(), &VolumeAssignRequest{
|
||||
Count: 1,
|
||||
|
@ -2,6 +2,7 @@ package operation
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@ -58,7 +59,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
|
||||
PairMap: nil,
|
||||
Jwt: "",
|
||||
}
|
||||
uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption)
|
||||
uploadResult, err, data := uploader.Upload(context.Background(), bytes.NewReader([]byte(textContent)), uploadOption)
|
||||
if len(data) != len(textContent) {
|
||||
t.Errorf("data actual %d expected %d", len(data), len(textContent))
|
||||
}
|
||||
@ -86,7 +87,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
|
||||
PairMap: nil,
|
||||
Jwt: "",
|
||||
}
|
||||
uploader.Upload(bytes.NewReader(gzippedData), uploadOption)
|
||||
uploader.Upload(context.Background(), bytes.NewReader(gzippedData), uploadOption)
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -62,7 +62,7 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []*
|
||||
Ttl: pref.Ttl,
|
||||
DiskType: pref.DiskType,
|
||||
}
|
||||
ret, err := Assign(masterFn, grpcDialOption, ar)
|
||||
ret, err := Assign(context.Background(), masterFn, grpcDialOption, ar)
|
||||
if err != nil {
|
||||
for index := range files {
|
||||
results[index].Error = err.Error()
|
||||
@ -155,7 +155,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
|
||||
Ttl: fi.Pref.Ttl,
|
||||
DiskType: fi.Pref.DiskType,
|
||||
}
|
||||
ret, err = Assign(masterFn, grpcDialOption, ar)
|
||||
ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -169,7 +169,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
|
||||
Ttl: fi.Pref.Ttl,
|
||||
DiskType: fi.Pref.DiskType,
|
||||
}
|
||||
ret, err = Assign(masterFn, grpcDialOption, ar)
|
||||
ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar)
|
||||
if err != nil {
|
||||
// delete all uploaded chunks
|
||||
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
|
||||
@ -223,7 +223,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
|
||||
return 0, e
|
||||
}
|
||||
|
||||
ret, e, _ := uploader.Upload(fi.Reader, uploadOption)
|
||||
ret, e, _ := uploader.Upload(context.Background(), fi.Reader, uploadOption)
|
||||
if e != nil {
|
||||
return 0, e
|
||||
}
|
||||
@ -267,7 +267,7 @@ func uploadOneChunk(filename string, reader io.Reader, masterFn GetMasterFn,
|
||||
return 0, uploaderError
|
||||
}
|
||||
|
||||
uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption)
|
||||
uploadResult, uploadError, _ := uploader.Upload(context.Background(), reader, uploadOption)
|
||||
if uploadError != nil {
|
||||
return 0, uploadError
|
||||
}
|
||||
@ -299,6 +299,6 @@ func uploadChunkedFileManifest(fileUrl string, manifest *ChunkManifest, jwt secu
|
||||
return e
|
||||
}
|
||||
|
||||
_, e = uploader.UploadData(buf, uploadOption)
|
||||
_, e = uploader.UploadData(context.Background(), buf, uploadOption)
|
||||
return e
|
||||
}
|
||||
|
@ -134,7 +134,7 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
|
||||
uploadOption.Jwt = auth
|
||||
|
||||
var uploadErr error
|
||||
uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption)
|
||||
uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption)
|
||||
return uploadErr
|
||||
}
|
||||
if uploadOption.RetryForever {
|
||||
@ -151,18 +151,18 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
|
||||
}
|
||||
|
||||
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
|
||||
func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
||||
uploadResult, err = uploader.retriedUploadData(data, option)
|
||||
func (uploader *Uploader) UploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
||||
uploadResult, err = uploader.retriedUploadData(ctx, data, option)
|
||||
return
|
||||
}
|
||||
|
||||
// Upload sends a POST request to a volume server to upload the content with fast compression
|
||||
func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
|
||||
uploadResult, err, data = uploader.doUpload(reader, option)
|
||||
func (uploader *Uploader) Upload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
|
||||
uploadResult, err, data = uploader.doUpload(ctx, reader, option)
|
||||
return
|
||||
}
|
||||
|
||||
func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
|
||||
func (uploader *Uploader) doUpload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
|
||||
bytesReader, ok := reader.(*util.BytesReader)
|
||||
if ok {
|
||||
data = bytesReader.Bytes
|
||||
@ -173,16 +173,16 @@ func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uplo
|
||||
return
|
||||
}
|
||||
}
|
||||
uploadResult, uploadErr := uploader.retriedUploadData(data, option)
|
||||
uploadResult, uploadErr := uploader.retriedUploadData(ctx, data, option)
|
||||
return uploadResult, uploadErr, data
|
||||
}
|
||||
|
||||
func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
||||
func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
||||
for i := 0; i < 3; i++ {
|
||||
if i > 0 {
|
||||
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
|
||||
}
|
||||
uploadResult, err = uploader.doUploadData(data, option)
|
||||
uploadResult, err = uploader.doUploadData(ctx, data, option)
|
||||
if err == nil {
|
||||
uploadResult.RetryCount = i
|
||||
return
|
||||
@ -192,7 +192,7 @@ func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (
|
||||
return
|
||||
}
|
||||
|
||||
func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
||||
func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
||||
contentIsGzipped := option.IsInputCompressed
|
||||
shouldGzipNow := false
|
||||
if !option.IsInputCompressed {
|
||||
@ -248,7 +248,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
|
||||
}
|
||||
|
||||
// upload data
|
||||
uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
|
||||
uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
|
||||
_, err = w.Write(encryptedData)
|
||||
return
|
||||
}, len(encryptedData), &UploadOption{
|
||||
@ -272,7 +272,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
|
||||
}
|
||||
} else {
|
||||
// upload data
|
||||
uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
|
||||
uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
|
||||
_, err = w.Write(data)
|
||||
return
|
||||
}, len(data), &UploadOption{
|
||||
@ -298,7 +298,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
|
||||
return uploadResult, err
|
||||
}
|
||||
|
||||
func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
|
||||
func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
|
||||
var body_writer *multipart.Writer
|
||||
var reqReader *bytes.Reader
|
||||
var buf *bytebufferpool.ByteBuffer
|
||||
@ -358,6 +358,9 @@ func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) er
|
||||
if option.Jwt != "" {
|
||||
req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
|
||||
}
|
||||
|
||||
util.ReqWithRequestId(req, ctx)
|
||||
|
||||
// print("+")
|
||||
resp, post_err := uploader.httpClient.Do(req)
|
||||
defer util_http.CloseResponse(resp)
|
||||
|
@ -227,7 +227,7 @@ func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, exist
|
||||
"",
|
||||
"",
|
||||
) // ignore readonly error for capacity needed to manifestize
|
||||
chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks)
|
||||
chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), chunks)
|
||||
if err != nil {
|
||||
// not good, but should be ok
|
||||
glog.V(0).Infof("MaybeManifestize: %v", err)
|
||||
@ -276,7 +276,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
|
||||
glog.Warningf("detectStorageOption: %v", err)
|
||||
return &filer_pb.AppendToEntryResponse{}, err
|
||||
}
|
||||
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.GetChunks())
|
||||
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), entry.GetChunks())
|
||||
if err != nil {
|
||||
// not good, but should be ok
|
||||
glog.V(0).Infof("MaybeManifestize: %v", err)
|
||||
@ -313,7 +313,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
|
||||
|
||||
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
|
||||
|
||||
assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
|
||||
assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
|
||||
if err != nil {
|
||||
glog.V(3).Infof("AssignVolume: %v", err)
|
||||
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
|
||||
|
@ -97,7 +97,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
|
||||
}
|
||||
|
||||
// assign one volume server
|
||||
assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
|
||||
assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
|
||||
if err != nil {
|
||||
fetchAndWriteErr = err
|
||||
return
|
||||
|
@ -3,6 +3,7 @@ package weed_server
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
||||
"io"
|
||||
@ -53,6 +54,7 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
|
||||
|
||||
proxyReq.Header.Set("Host", r.Host)
|
||||
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
|
||||
util.ReqWithRequestId(proxyReq, r.Context())
|
||||
|
||||
for header, values := range r.Header {
|
||||
for _, value := range values {
|
||||
|
@ -1,7 +1,6 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"strconv"
|
||||
@ -18,7 +17,7 @@ import (
|
||||
// sub directories are listed on the first page, when "lastFileName"
|
||||
// is empty.
|
||||
func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
ctx := r.Context()
|
||||
if fs.option.ExposeDirectoryData == false {
|
||||
writeJsonError(w, r, http.StatusForbidden, errors.New("ui is disabled"))
|
||||
return
|
||||
@ -40,7 +39,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
|
||||
namePattern := r.FormValue("namePattern")
|
||||
namePatternExclude := r.FormValue("namePatternExclude")
|
||||
|
||||
entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
|
||||
entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(ctx, util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
|
||||
|
||||
if err != nil {
|
||||
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
|
||||
|
@ -1,7 +1,6 @@
|
||||
package weed_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
@ -57,7 +56,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request)
|
||||
// curl -X DELETE http://localhost:8888/path/to/a/file?tagging
|
||||
func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
ctx := context.Background()
|
||||
ctx := r.Context()
|
||||
|
||||
path := r.URL.Path
|
||||
if strings.HasSuffix(path, "/") {
|
||||
|
@ -34,7 +34,7 @@ type FilerPostResult struct {
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
|
||||
func (fs *FilerServer) assignNewFileInfo(ctx context.Context, so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
|
||||
|
||||
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssign).Inc()
|
||||
start := time.Now()
|
||||
@ -44,7 +44,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
|
||||
|
||||
ar, altRequest := so.ToAssignRequests(1)
|
||||
|
||||
assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
|
||||
assignResult, ae := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
|
||||
if ae != nil {
|
||||
glog.Errorf("failing to assign a file id: %v", ae)
|
||||
err = ae
|
||||
|
@ -306,7 +306,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
||||
}
|
||||
|
||||
// maybe compact entry chunks
|
||||
mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
|
||||
mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), mergedChunks)
|
||||
if replyerr != nil {
|
||||
glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
|
||||
return
|
||||
@ -348,7 +348,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
|
||||
return filerResult, replyerr
|
||||
}
|
||||
|
||||
func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
|
||||
func (fs *FilerServer) saveAsChunk(ctx context.Context, so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
|
||||
|
||||
return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) {
|
||||
var fileId string
|
||||
@ -356,7 +356,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
|
||||
|
||||
err := util.Retry("saveAsChunk", func() error {
|
||||
// assign one file id for one chunk
|
||||
assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so)
|
||||
assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so)
|
||||
if assignErr != nil {
|
||||
return assignErr
|
||||
}
|
||||
@ -380,7 +380,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
|
||||
}
|
||||
|
||||
var uploadErr error
|
||||
uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption)
|
||||
uploadResult, uploadErr, _ = uploader.Upload(ctx, reader, uploadOption)
|
||||
if uploadErr != nil {
|
||||
return uploadErr
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
// handling single chunk POST or PUT upload
|
||||
func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) (filerResult *FilerPostResult, err error) {
|
||||
|
||||
fileId, urlLocation, auth, err := fs.assignNewFileInfo(so)
|
||||
fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so)
|
||||
|
||||
if err != nil || fileId == "" || urlLocation == "" {
|
||||
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter)
|
||||
@ -59,7 +59,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
|
||||
return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr)
|
||||
}
|
||||
|
||||
uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption)
|
||||
uploadResult, uploadError := uploader.UploadData(ctx, uncompressedData, uploadOption)
|
||||
if uploadError != nil {
|
||||
return nil, fmt.Errorf("upload to volume server: %v", uploadError)
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, reader io.Reade
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
chunks, toChunkErr := fs.dataToChunk(fileName, contentType, buf.Bytes(), offset, so)
|
||||
chunks, toChunkErr := fs.dataToChunk(ctx, fileName, contentType, buf.Bytes(), offset, so)
|
||||
if toChunkErr != nil {
|
||||
uploadErrLock.Lock()
|
||||
if uploadErr == nil {
|
||||
@ -162,7 +162,7 @@ func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, reader io.Reade
|
||||
return fileChunks, md5Hash, chunkOffset, nil, smallContent
|
||||
}
|
||||
|
||||
func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
|
||||
func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
|
||||
|
||||
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUpload).Inc()
|
||||
start := time.Now()
|
||||
@ -185,14 +185,14 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
|
||||
return nil, err, []byte{}
|
||||
}
|
||||
|
||||
uploadResult, err, data := uploader.Upload(limitedReader, uploadOption)
|
||||
uploadResult, err, data := uploader.Upload(ctx, limitedReader, uploadOption)
|
||||
if uploadResult != nil && uploadResult.RetryCount > 0 {
|
||||
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount))
|
||||
}
|
||||
return uploadResult, err, data
|
||||
}
|
||||
|
||||
func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
|
||||
func (fs *FilerServer) dataToChunk(ctx context.Context, fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
|
||||
dataReader := util.NewBytesReader(data)
|
||||
|
||||
// retry to assign a different file id
|
||||
@ -204,14 +204,14 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
|
||||
|
||||
err := util.Retry("filerDataToChunk", func() error {
|
||||
// assign one file id for one chunk
|
||||
fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
|
||||
fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(ctx, so)
|
||||
if uploadErr != nil {
|
||||
glog.V(4).Infof("retry later due to assign error: %v", uploadErr)
|
||||
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc()
|
||||
return uploadErr
|
||||
}
|
||||
// upload the chunk to the volume server
|
||||
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
|
||||
uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth)
|
||||
if uploadErr != nil {
|
||||
glog.V(4).Infof("retry later due to upload error: %v", uploadErr)
|
||||
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc()
|
||||
|
@ -77,7 +77,7 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
|
||||
return
|
||||
}
|
||||
|
||||
if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil {
|
||||
if _, replicaWriteErr := uploader.UploadData(ctx, data, uploadOption); replicaWriteErr != nil && err == nil {
|
||||
err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr)
|
||||
}
|
||||
}(replica.Url)
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
)
|
||||
|
||||
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
if e := r.ParseForm(); e != nil {
|
||||
glog.V(0).Infoln("form parse error:", e)
|
||||
writeJsonError(w, r, http.StatusBadRequest, e)
|
||||
@ -45,7 +46,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
ret := operation.UploadResult{}
|
||||
isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5)
|
||||
isUnchanged, writeError := topology.ReplicatedWrite(ctx, vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5)
|
||||
if writeError != nil {
|
||||
writeJsonError(w, r, http.StatusInternalServerError, writeError)
|
||||
return
|
||||
|
@ -351,7 +351,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie
|
||||
jwt = security.GenJwtForVolumeServer(security.SigningKey(signingKey), expiresAfterSec, toFid.String())
|
||||
}
|
||||
|
||||
_, err, _ = uploader.Upload(reader, &operation.UploadOption{
|
||||
_, err, _ = uploader.Upload(context.Background(), reader, &operation.UploadOption{
|
||||
UploadUrl: uploadURL,
|
||||
Filename: filename,
|
||||
IsInputCompressed: isCompressed,
|
||||
|
@ -1,6 +1,7 @@
|
||||
package topology
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -23,7 +24,7 @@ import (
|
||||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
||||
)
|
||||
|
||||
func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) {
|
||||
func ReplicatedWrite(ctx context.Context, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) {
|
||||
|
||||
//check JWT
|
||||
jwt := security.GetJwt(r)
|
||||
@ -121,7 +122,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
|
||||
glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String())
|
||||
return err
|
||||
}
|
||||
_, err = uploader.UploadData(n.Data, uploadOption)
|
||||
_, err = uploader.UploadData(ctx, n.Data, uploadOption)
|
||||
if err != nil {
|
||||
glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String())
|
||||
}
|
||||
|
@ -100,10 +100,6 @@ func maybeAddAuth(req *http.Request, jwt string) {
|
||||
}
|
||||
}
|
||||
|
||||
func reqWithRequestId(req *http.Request, ctx context.Context) {
|
||||
req.Header.Set(util.RequestIdHttpHeader, util.GetRequestID(ctx))
|
||||
}
|
||||
|
||||
func Delete(url string, jwt string) error {
|
||||
req, err := http.NewRequest(http.MethodDelete, url, nil)
|
||||
maybeAddAuth(req, jwt)
|
||||
@ -311,7 +307,7 @@ func ReadUrlAsStreamAuthenticated(ctx context.Context, fileUrl, jwt string, ciph
|
||||
} else {
|
||||
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
|
||||
}
|
||||
reqWithRequestId(req, ctx)
|
||||
util.ReqWithRequestId(req, ctx)
|
||||
|
||||
r, err := GetGlobalHttpClient().Do(req)
|
||||
if err != nil {
|
||||
|
@ -1,6 +1,9 @@
|
||||
package util
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
const (
|
||||
RequestIdHttpHeader = "X-Request-ID"
|
||||
@ -18,3 +21,7 @@ func GetRequestID(ctx context.Context) string {
|
||||
func WithRequestID(ctx context.Context, id string) context.Context {
|
||||
return context.WithValue(ctx, RequestIDKey, id)
|
||||
}
|
||||
|
||||
func ReqWithRequestId(req *http.Request, ctx context.Context) {
|
||||
req.Header.Set(RequestIdHttpHeader, GetRequestID(ctx))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user