diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go index 27876272c..52368f8cd 100644 --- a/unmaintained/change_superblock/change_superblock.go +++ b/unmaintained/change_superblock/change_superblock.go @@ -11,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -40,6 +41,8 @@ that has those volumes. */ func main() { flag.Parse() + util_http.NewGlobalHttpClient() + fileName := strconv.Itoa(*fixVolumeId) if *fixVolumeCollection != "" { fileName = *fixVolumeCollection + "_" + fileName diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index 9433af147..e289fefe8 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -20,6 +20,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -40,7 +41,8 @@ var ( */ func main() { flag.Parse() - + util_http.InitGlobalHttpClient() + util.LoadSecurityConfiguration() grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go index 760fc79ca..164b5b238 100644 --- a/unmaintained/fix_dat/fix_dat.go +++ b/unmaintained/fix_dat/fix_dat.go @@ -14,6 +14,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -36,6 +37,8 @@ The .idx has all correct offsets. */ func main() { flag.Parse() + util_http.InitGlobalHttpClient() + fileName := strconv.Itoa(*fixVolumeId) if *fixVolumeCollection != "" { fileName = *fixVolumeCollection + "_" + fileName diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go index 2eeb5d6f9..2b63d5d59 100644 --- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go +++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "time" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -23,8 +24,8 @@ var ( ) func main() { - flag.Parse() + util_http.InitGlobalHttpClient() if *isWrite { startGenerateMetadata() diff --git a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go index 354707c81..cfac97432 100644 --- a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go +++ b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go @@ -11,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -71,6 +72,7 @@ func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset in func main() { flag.Parse() + util_http.InitGlobalHttpClient() vid := needle.VolumeId(*volumeId) diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go index 65ec94627..1f89bd902 100644 --- a/unmaintained/repeated_vacuum/repeated_vacuum.go +++ b/unmaintained/repeated_vacuum/repeated_vacuum.go @@ -14,6 +14,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -25,6 +26,7 @@ var ( func main() { flag.Parse() + util_http.InitGlobalHttpClient() util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") @@ -34,7 +36,7 @@ func main() { go func() { for { println("vacuum threshold", *garbageThreshold) - _, _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", pb.ServerAddress(*master).ToHttpAddress(), *garbageThreshold)) + _, _, err := util_http.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", pb.ServerAddress(*master).ToHttpAddress(), *garbageThreshold)) if err != nil { log.Fatalf("vacuum: %v", err) } @@ -47,7 +49,7 @@ func main() { assignResult, targetUrl := genFile(grpcDialOption, i) - util.Delete(targetUrl, string(assignResult.Auth)) + util_http.Delete(targetUrl, string(assignResult.Auth)) } @@ -76,7 +78,13 @@ func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, st PairMap: nil, Jwt: assignResult.Auth, } - _, err = operation.UploadData(data, uploadOption) + + uploader, err := operation.NewUploader() + if err != nil { + log.Fatalf("upload: %v", err) + } + + _, err = uploader.UploadData(data, uploadOption) if err != nil { log.Fatalf("upload: %v", err) } diff --git a/unmaintained/s3/presigned_put/presigned_put.go b/unmaintained/s3/presigned_put/presigned_put.go index ba135ff25..1e591dff2 100644 --- a/unmaintained/s3/presigned_put/presigned_put.go +++ b/unmaintained/s3/presigned_put/presigned_put.go @@ -7,10 +7,10 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" - "github.com/seaweedfs/seaweedfs/weed/util" "net/http" "strings" "time" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) // Downloads an item from an S3 Bucket in the region configured in the shared config @@ -21,6 +21,8 @@ import ( // For this exampl to work, the domainName is needd // weed s3 -domainName=localhost func main() { + util_http.InitGlobalHttpClient() + h := md5.New() content := strings.NewReader(stringContent) content.WriteTo(h) @@ -64,7 +66,7 @@ func main() { fmt.Printf("error put request: %v\n", err) return } - defer util.CloseResponse(resp) + defer util_http.CloseResponse(resp) fmt.Printf("response: %+v\n", resp) } diff --git a/unmaintained/see_dat/see_dat.go b/unmaintained/see_dat/see_dat.go index 1b2f0bb6a..a60e45760 100644 --- a/unmaintained/see_dat/see_dat.go +++ b/unmaintained/see_dat/see_dat.go @@ -10,6 +10,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -40,6 +41,7 @@ func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset in func main() { flag.Parse() + util_http.InitGlobalHttpClient() vid := needle.VolumeId(*volumeId) diff --git a/unmaintained/see_idx/see_idx.go b/unmaintained/see_idx/see_idx.go index 856d96d54..87f00ebb0 100644 --- a/unmaintained/see_idx/see_idx.go +++ b/unmaintained/see_idx/see_idx.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/types" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -27,6 +28,8 @@ This is to see content in .idx files. */ func main() { flag.Parse() + util_http.InitGlobalHttpClient() + fileName := strconv.Itoa(*fixVolumeId) if *fixVolumeCollection != "" { fileName = *fixVolumeCollection + "_" + fileName diff --git a/unmaintained/see_log_entry/see_log_entry.go b/unmaintained/see_log_entry/see_log_entry.go index d5deff283..42a63476b 100644 --- a/unmaintained/see_log_entry/see_log_entry.go +++ b/unmaintained/see_log_entry/see_log_entry.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -20,6 +21,7 @@ var ( func main() { flag.Parse() + util_http.InitGlobalHttpClient() dst, err := os.OpenFile(*logdataFile, os.O_RDONLY, 0644) if err != nil { diff --git a/unmaintained/see_meta/see_meta.go b/unmaintained/see_meta/see_meta.go index 6fc88358c..da78f0918 100644 --- a/unmaintained/see_meta/see_meta.go +++ b/unmaintained/see_meta/see_meta.go @@ -11,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -19,6 +20,7 @@ var ( func main() { flag.Parse() + util_http.InitGlobalHttpClient() dst, err := os.OpenFile(*metaFile, os.O_RDONLY, 0644) if err != nil { diff --git a/unmaintained/stream_read_volume/stream_read_volume.go b/unmaintained/stream_read_volume/stream_read_volume.go index 2737962f2..cfdb36815 100644 --- a/unmaintained/stream_read_volume/stream_read_volume.go +++ b/unmaintained/stream_read_volume/stream_read_volume.go @@ -13,6 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -23,6 +24,7 @@ var ( func main() { flag.Parse() + util_http.InitGlobalHttpClient() util.LoadSecurityConfiguration() grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") diff --git a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go index 7700a6dce..6dc703dbc 100644 --- a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go +++ b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go @@ -13,6 +13,7 @@ import ( "strings" "sync" "time" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -30,8 +31,8 @@ type stat struct { } func main() { - flag.Parse() + util_http.InitGlobalHttpClient() data := make([]byte, *size) println("data len", len(data)) @@ -43,16 +44,12 @@ func main() { go func(x int) { defer wg.Done() - client := &http.Client{Transport: &http.Transport{ - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - }} r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(x))) for t := 0; t < *times; t++ { for f := 0; f < *fileCount; f++ { fn := r.Intn(*fileCount) - if size, err := uploadFileToFiler(client, data, fmt.Sprintf("file%04d", fn), *destination); err == nil { + if size, err := uploadFileToFiler(data, fmt.Sprintf("file%04d", fn), *destination); err == nil { statsChan <- stat{ size: size, } @@ -93,7 +90,7 @@ func main() { } -func uploadFileToFiler(client *http.Client, data []byte, filename, destination string) (size int64, err error) { +func uploadFileToFiler(data []byte, filename, destination string) (size int64, err error) { if !strings.HasSuffix(destination, "/") { destination = destination + "/" @@ -116,10 +113,13 @@ func uploadFileToFiler(client *http.Client, data []byte, filename, destination s uri := destination + filename request, err := http.NewRequest(http.MethodPost, uri, body) + if err != nil { + return 0, fmt.Errorf("http POST %s: %v", uri, err) + } request.Header.Set("Content-Type", writer.FormDataContentType()) // request.Close = true // can not use this, which do not reuse http connection, impacting filer->volume also. - resp, err := client.Do(request) + resp, err := util_http.GetGlobalHttpClient().Do(request) if err != nil { return 0, fmt.Errorf("http POST %s: %v", uri, err) } else { diff --git a/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go index c8d36053b..1cdcad0b3 100644 --- a/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go +++ b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go @@ -14,6 +14,7 @@ import ( "strings" "sync" "time" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -30,8 +31,8 @@ type stat struct { } func main() { - flag.Parse() + util_http.InitGlobalHttpClient() var fileNames []string @@ -51,8 +52,6 @@ func main() { for x := 0; x < *concurrency; x++ { wg.Add(1) - client := &http.Client{} - go func() { defer wg.Done() rand.Shuffle(len(fileNames), func(i, j int) { @@ -60,7 +59,7 @@ func main() { }) for t := 0; t < *times; t++ { for _, filename := range fileNames { - if size, err := uploadFileToFiler(client, filename, *destination); err == nil { + if size, err := uploadFileToFiler(filename, *destination); err == nil { statsChan <- stat{ size: size, } @@ -99,7 +98,7 @@ func main() { } -func uploadFileToFiler(client *http.Client, filename, destination string) (size int64, err error) { +func uploadFileToFiler(filename, destination string) (size int64, err error) { file, err := os.Open(filename) if err != nil { panic(err) @@ -131,9 +130,13 @@ func uploadFileToFiler(client *http.Client, filename, destination string) (size uri := destination + file.Name() request, err := http.NewRequest(http.MethodPost, uri, body) + if err != nil { + return 0, fmt.Errorf("http POST %s: %v", uri, err) + } + request.Header.Set("Content-Type", writer.FormDataContentType()) - resp, err := client.Do(request) + resp, err := util_http.GetGlobalHttpClient().Do(request) if err != nil { return 0, fmt.Errorf("http POST %s: %v", uri, err) } else { diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go index c210db81f..a75a095d4 100644 --- a/unmaintained/volume_tailer/volume_tailer.go +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" util2 "github.com/seaweedfs/seaweedfs/weed/util" "golang.org/x/tools/godoc/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -24,6 +25,7 @@ var ( func main() { flag.Parse() + util_http.InitGlobalHttpClient() util2.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util2.GetViper(), "grpc.client") diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 0cd9d31c5..bc7ee1292 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -22,6 +22,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/wdclient" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) type BenchmarkOptions struct { @@ -214,7 +215,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if isSecure { jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(context.Background()), b.grpcDialOption, df.fp.Fid) } - if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil { + if e := util_http.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil { s.completed++ } else { s.failed++ @@ -295,7 +296,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { } var bytes []byte for _, url := range urls { - bytes, _, err = util.Get(url) + bytes, _, err = util_http.Get(url) if err == nil { break } diff --git a/weed/command/download.go b/weed/command/download.go index 1032dcb62..1b7098824 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -15,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -63,11 +64,11 @@ func downloadToFile(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpti if lookupError != nil { return lookupError } - filename, _, rc, err := util.DownloadFile(fileUrl, jwt) + filename, _, rc, err := util_http.DownloadFile(fileUrl, jwt) if err != nil { return err } - defer util.CloseResponse(rc) + defer util_http.CloseResponse(rc) if filename == "" { filename = fileId } @@ -116,10 +117,10 @@ func fetchContent(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption return "", nil, lookupError } var rc *http.Response - if filename, _, rc, e = util.DownloadFile(fileUrl, jwt); e != nil { + if filename, _, rc, e = util_http.DownloadFile(fileUrl, jwt); e != nil { return "", nil, e } - defer util.CloseResponse(rc) + defer util_http.CloseResponse(rc) content, e = io.ReadAll(rc.Body) return } diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 59cd5491d..8f6cb233e 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -344,7 +344,12 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err return err } - finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry( + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + return uploaderErr + } + + finalFileId, uploadResult, flushErr, _ := uploader.UploadWithRetry( worker, &filer_pb.AssignVolumeRequest{ Count: 1, @@ -423,7 +428,13 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, <-concurrentChunks }() - fileId, uploadResult, err, _ := operation.UploadWithRetry( + uploader, err := operation.NewUploader() + if err != nil { + uploadError = fmt.Errorf("upload data %v: %v\n", fileName, err) + return + } + + fileId, uploadResult, err, _ := uploader.UploadWithRetry( worker, &filer_pb.AssignVolumeRequest{ Count: 1, @@ -535,8 +546,12 @@ func detectMimeType(f *os.File) string { } func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + return nil, fmt.Errorf("upload data: %v", uploaderErr) + } - finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry( + finalFileId, uploadResult, flushErr, _ := uploader.UploadWithRetry( worker, &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml index 687854264..113e5b016 100644 --- a/weed/command/scaffold/security.toml +++ b/weed/command/scaffold/security.toml @@ -94,10 +94,14 @@ allowed_commonNames = "" # comma-separated SSL certificate common names [grpc.client] cert = "" key = "" -# Note: work in progress! -# this does not work with other clients, e.g., "weed filer|mount" etc, yet. + +# https client for master|volume|filer|etc connection +# It is necessary that the parameters [https.volume]|[https.master]|[https.filer] are set [https.client] enabled = true +cert = "" +key = "" +ca = "" # volume server https options [https.volume] diff --git a/weed/command/update.go b/weed/command/update.go index 314a903f2..4f2b66b2e 100644 --- a/weed/command/update.go +++ b/weed/command/update.go @@ -21,6 +21,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" "golang.org/x/net/context/ctxhttp" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) //copied from https://github.com/restic/restic/tree/master/internal/selfupdate @@ -198,7 +199,7 @@ func GitHubLatestRelease(ctx context.Context, ver string, owner, repo string) (R if err != nil { return Release{}, err } - defer util.CloseResponse(res) + defer util_http.CloseResponse(res) if res.StatusCode != http.StatusOK { content := res.Header.Get("Content-Type") @@ -258,7 +259,7 @@ func getGithubData(ctx context.Context, url string) ([]byte, error) { if err != nil { return nil, err } - defer util.CloseResponse(res) + defer util_http.CloseResponse(res) if res.StatusCode != http.StatusOK { return nil, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status) diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 7ea2f0353..e9ae1800c 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -15,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) const ( @@ -120,7 +121,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return 0, err } - return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) + return util_http.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) } func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { @@ -132,7 +133,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt stri for _, urlString := range urlStrings { var localProcessed int var writeErr error - shouldRetry, err = util.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { if totalWritten > localProcessed { toBeSkipped := totalWritten - localProcessed if len(data) <= toBeSkipped { diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index 66ce24871..3c9a3496c 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -77,7 +77,13 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi PairMap: nil, Jwt: assignResult.Auth, } - uploadResult, err := operation.UploadData(data, uploadOption) + + uploader, err := operation.NewUploader() + if err != nil { + return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) + } + + uploadResult, err := uploader.UploadData(data, uploadOption) if err != nil { return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) } diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 7be54b193..fea2bbc89 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -2,7 +2,6 @@ package filer import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/util" "sync" "sync/atomic" "time" @@ -10,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/wdclient" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) type ReaderCache struct { @@ -171,7 +171,7 @@ func (s *SingleChunkCacher) startCaching() { s.data = mem.Allocate(s.chunkSize) - _, s.err = util.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0) + _, s.err = util_http.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0) if s.err != nil { mem.Free(s.data) s.data = nil diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 23a853b9a..fdb443b53 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -16,6 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/wdclient" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var getLookupFileIdBackoffSchedule = []time.Duration{ @@ -194,7 +195,7 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer return err } - n, err := util.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk) + n, err := util_http.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk) if err != nil { return err } @@ -350,7 +351,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) { buffer.Write(data) }) if !shouldRetry { diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go index 4c8470245..77ad01b89 100644 --- a/weed/mount/weedfs_write.go +++ b/weed/mount/weedfs_write.go @@ -14,8 +14,12 @@ import ( func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, filename string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { + uploader, err := operation.NewUploader() + if err != nil { + return + } - fileId, uploadResult, err, data := operation.UploadWithRetry( + fileId, uploadResult, err, data := uploader.UploadWithRetry( wfs, &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index cb55e2032..c8a08eb18 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -13,6 +13,7 @@ import ( "math" "sync/atomic" "time" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType { @@ -131,7 +132,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p for _, urlString := range urlStrings { // TODO optimization opportunity: reuse the buffer var data []byte - if data, _, err = util.Get(urlString); err == nil { + if data, _, err = util_http.Get(urlString); err == nil { processed = true if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil { return diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go index 896f0ee75..9999529fb 100644 --- a/weed/mq/broker/broker_write.go +++ b/weed/mq/broker/broker_write.go @@ -55,7 +55,13 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) { reader := util.NewBytesReader(data) - fileId, uploadResult, err, _ = operation.UploadWithRetry( + + uploader, err := operation.NewUploader() + if err != nil { + return + } + + fileId, uploadResult, err, _ = uploader.UploadWithRetry( b, &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go index 096b355a1..3ab3cb251 100644 --- a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go +++ b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "time" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -45,6 +46,7 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) { func main() { flag.Parse() + util_http.InitGlobalHttpClient() config := &pub_client.PublisherConfiguration{ Topic: topic.NewTopic(*namespace, *t), diff --git a/weed/mq/client/cmd/weed_pub_record/publisher_record.go b/weed/mq/client/cmd/weed_pub_record/publisher_record.go index a5fbd455e..f340dd1c8 100644 --- a/weed/mq/client/cmd/weed_pub_record/publisher_record.go +++ b/weed/mq/client/cmd/weed_pub_record/publisher_record.go @@ -11,6 +11,7 @@ import ( "strings" "sync" "time" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -86,6 +87,7 @@ func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue { func main() { flag.Parse() + util_http.InitGlobalHttpClient() recordType := schema.RecordTypeBegin(). WithField("key", schema.TypeBytes). diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go index adcdda04c..fa0e10579 100644 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "strings" "time" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -23,6 +24,7 @@ var ( func main() { flag.Parse() + util_http.InitGlobalHttpClient() subscriberConfig := &sub_client.SubscriberConfiguration{ ClientId: fmt.Sprintf("client-%d", *clientId), diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go index 53eb4f15b..914021dbc 100644 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -13,6 +13,7 @@ import ( "google.golang.org/protobuf/proto" "strings" "time" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -49,6 +50,7 @@ func FromSchemaRecordValue(recordValue *schema_pb.RecordValue) *MyRecord { func main() { flag.Parse() + util_http.InitGlobalHttpClient() subscriberConfig := &sub_client.SubscriberConfiguration{ ClientId: fmt.Sprintf("client-%d", *clientId), diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 02faf9904..50313a670 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -15,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -103,11 +104,11 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) } - resp, err := util.Do(req) + resp, err := util_http.Do(req) if err != nil { return written, err } - defer util.CloseResponse(resp) + defer util_http.CloseResponse(resp) switch resp.StatusCode { case http.StatusRequestedRangeNotSatisfiable: diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go index 07b0153a9..b4bac5976 100644 --- a/weed/operation/needle_parse_test.go +++ b/weed/operation/needle_parse_test.go @@ -38,15 +38,11 @@ If the content is already compressed, need to know the content size. */ func TestCreateNeedleFromRequest(t *testing.T) { - mc := &MockClient{} - tmp := HttpClient - HttpClient = mc - defer func() { - HttpClient = tmp - }() + mockClient := &MockClient{} + uploader := newUploader(mockClient) { - mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) { assert.Equal(t, nil, err, "upload: %v", err) assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime)) assert.Equal(t, true, n.IsCompressed(), "this should be compressed") @@ -62,7 +58,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { PairMap: nil, Jwt: "", } - uploadResult, err, data := Upload(bytes.NewReader([]byte(textContent)), uploadOption) + uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption) if len(data) != len(textContent) { t.Errorf("data actual %d expected %d", len(data), len(textContent)) } @@ -73,7 +69,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { } { - mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) { assert.Equal(t, nil, err, "upload: %v", err) assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime)) assert.Equal(t, true, n.IsCompressed(), "this should be compressed") @@ -90,7 +86,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { PairMap: nil, Jwt: "", } - Upload(bytes.NewReader(gzippedData), uploadOption) + uploader.Upload(bytes.NewReader(gzippedData), uploadOption) } /* diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 57bd81b14..516478dbe 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -217,7 +217,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw PairMap: nil, Jwt: jwt, } - ret, e, _ := Upload(fi.Reader, uploadOption) + + uploader, e := NewUploader() + if e != nil { + return 0, e + } + + ret, e, _ := uploader.Upload(fi.Reader, uploadOption) if e != nil { return 0, e } @@ -239,7 +245,13 @@ func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn, PairMap: nil, Jwt: jwt, } - uploadResult, uploadError, _ := Upload(reader, uploadOption) + + uploader, uploaderError := NewUploader() + if uploaderError != nil { + return 0, uploaderError + } + + uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption) if uploadError != nil { return 0, uploadError } @@ -265,6 +277,12 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s PairMap: nil, Jwt: jwt, } - _, e = UploadData(buf, uploadOption) + + uploader, e := NewUploader() + if e != nil { + return e + } + + _, e = uploader.UploadData(buf, uploadOption) return e } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 6c6aec1b5..8b223e769 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -9,7 +9,7 @@ import ( "io" "mime" "mime/multipart" - "net" + "sync" "net/http" "net/textproto" "path/filepath" @@ -21,6 +21,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client" ) type UploadOption struct { @@ -62,29 +64,47 @@ func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsN } } +var ( + fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") + uploader *Uploader + uploaderErr error + once sync.Once +) + // HTTPClient interface for testing type HTTPClient interface { Do(req *http.Request) (*http.Response, error) } -var ( - HttpClient HTTPClient -) +// Uploader +type Uploader struct { + httpClient HTTPClient +} -func init() { - HttpClient = &http.Client{Transport: &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: 10 * time.Second, - KeepAlive: 10 * time.Second, - }).DialContext, - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - }} +func NewUploader() (*Uploader, error) { + once.Do(func () { + // With Dial context + var httpClient *util_http_client.HTTPClient + httpClient, uploaderErr = util_http.NewGlobalHttpClient(util_http_client.AddDialContext) + if uploaderErr != nil { + uploaderErr = fmt.Errorf("error initializing the loader: %s", uploaderErr) + } + if httpClient != nil { + uploader = newUploader(httpClient) + } + }) + return uploader, uploaderErr +} + +func newUploader(httpClient HTTPClient) (*Uploader) { + return &Uploader{ + httpClient: httpClient, + } } // UploadWithRetry will retry both assigning volume request and uploading content // The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume. -func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { +func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { doUploadFunc := func() error { var host string @@ -114,7 +134,7 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A uploadOption.Jwt = auth var uploadErr error - uploadResult, uploadErr, data = doUpload(reader, uploadOption) + uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption) return uploadErr } if uploadOption.RetryForever { @@ -130,21 +150,19 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A return } -var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") - // Upload sends a POST request to a volume server to upload the content with adjustable compression level -func UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { - uploadResult, err = retriedUploadData(data, option) +func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { + uploadResult, err = uploader.retriedUploadData(data, option) return } // Upload sends a POST request to a volume server to upload the content with fast compression -func Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { - uploadResult, err, data = doUpload(reader, option) +func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { + uploadResult, err, data = uploader.doUpload(reader, option) return } -func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { +func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { bytesReader, ok := reader.(*util.BytesReader) if ok { data = bytesReader.Bytes @@ -155,16 +173,16 @@ func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResul return } } - uploadResult, uploadErr := retriedUploadData(data, option) + uploadResult, uploadErr := uploader.retriedUploadData(data, option) return uploadResult, uploadErr, data } -func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { for i := 0; i < 3; i++ { if i > 0 { time.Sleep(time.Millisecond * time.Duration(237*(i+1))) } - uploadResult, err = doUploadData(data, option) + uploadResult, err = uploader.doUploadData(data, option) if err == nil { uploadResult.RetryCount = i return @@ -174,7 +192,7 @@ func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadR return } -func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { contentIsGzipped := option.IsInputCompressed shouldGzipNow := false if !option.IsInputCompressed { @@ -230,7 +248,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult } // upload data - uploadResult, err = upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { _, err = w.Write(encryptedData) return }, len(encryptedData), &UploadOption{ @@ -251,7 +269,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult uploadResult.Size = uint32(clearDataLen) } else { // upload data - uploadResult, err = upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { _, err = w.Write(data) return }, len(data), &UploadOption{ @@ -277,7 +295,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult return uploadResult, err } -func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { +func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { var body_writer *multipart.Writer var reqReader *bytes.Reader var buf *bytebufferpool.ByteBuffer @@ -338,15 +356,15 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize req.Header.Set("Authorization", "BEARER "+string(option.Jwt)) } // print("+") - resp, post_err := HttpClient.Do(req) - defer util.CloseResponse(resp) + resp, post_err := uploader.httpClient.Do(req) + defer util_http.CloseResponse(resp) if post_err != nil { if strings.Contains(post_err.Error(), "connection reset by peer") || strings.Contains(post_err.Error(), "use of closed network connection") { glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr) stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc() - resp, post_err = HttpClient.Do(req) - defer util.CloseResponse(resp) + resp, post_err = uploader.httpClient.Do(req) + defer util_http.CloseResponse(resp) } } if post_err != nil { diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go index 9682ca623..4a77fd04a 100644 --- a/weed/replication/repl_util/replication_util.go +++ b/weed/replication/repl_util/replication_util.go @@ -4,7 +4,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/replication/source" - "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerSource *source.FilerSource, writeFunc func(data []byte) error) error { @@ -21,7 +21,7 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS var shouldRetry bool for _, fileUrl := range fileUrls { - shouldRetry, err = util.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) { writeErr = writeFunc(data) }) if err != nil { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 63e1226b6..4bcbc7898 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -14,6 +14,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string) (replicatedChunks []*filer_pb.FileChunk, err error) { @@ -88,9 +89,15 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) if err != nil { return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err) } - defer util.CloseResponse(resp) + defer util_http.CloseResponse(resp) - fileId, uploadResult, err, _ := operation.UploadWithRetry( + uploader, err := operation.NewUploader() + if err != nil { + glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err) + return "", fmt.Errorf("upload data: %v", err) + } + + fileId, uploadResult, err, _ := uploader.UploadWithRetry( fs, &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 167907a5a..768e251a4 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -15,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) type ReplicationSource interface { @@ -106,7 +107,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) { if fs.proxyByFiler { - return util.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "") + return util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "") } fileUrls, err := fs.LookupFileId(fileId) @@ -115,7 +116,7 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea } for _, fileUrl := range fileUrls { - filename, header, resp, err = util.DownloadFile(fileUrl, "") + filename, header, resp, err = util_http.DownloadFile(fileUrl, "") if err != nil { glog.V(1).Infof("fail to read from %s: %v", fileUrl, err) } else { diff --git a/weed/s3api/s3api_acl_helper.go b/weed/s3api/s3api_acl_helper.go index 0332b6a39..b9fb1131e 100644 --- a/weed/s3api/s3api_acl_helper.go +++ b/weed/s3api/s3api_acl_helper.go @@ -9,9 +9,9 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - "github.com/seaweedfs/seaweedfs/weed/util" "net/http" "strings" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) type AccountManager interface { @@ -32,7 +32,7 @@ func GetAccountId(r *http.Request) string { // ExtractAcl extracts the acl from the request body, or from the header if request body is empty func ExtractAcl(r *http.Request, accountManager AccountManager, ownership, bucketOwnerId, ownerId, accountId string) (grants []*s3.Grant, errCode s3err.ErrorCode) { if r.Body != nil && r.Body != http.NoBody { - defer util.CloseRequest(r) + defer util_http.CloseRequest(r) var acp s3.AccessControlPolicy err := xmlutil.UnmarshalXML(&acp, xml.NewDecoder(r.Body), "") diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index e3fa778a5..f451e60a2 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -13,7 +13,6 @@ import ( "github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil" "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" - "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" @@ -26,6 +25,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) { @@ -507,7 +507,7 @@ func (s3a *S3ApiServer) PutBucketOwnershipControls(w http.ResponseWriter, r *htt } var v s3.OwnershipControls - defer util.CloseRequest(r) + defer util_http.CloseRequest(r) err := xmlutil.UnmarshalXML(&v, xml.NewDecoder(r.Body), "") if err != nil { diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index ff7e92304..3ab72285f 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -16,7 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser { @@ -171,7 +171,7 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - defer util.CloseResponse(resp) + defer util_http.CloseResponse(resp) if resp.StatusCode == http.StatusPreconditionFailed { s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed) diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 8d13fe17e..4ca8010d2 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -14,6 +14,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) const ( @@ -87,12 +88,12 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request srcUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject)) - _, _, resp, err := util.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false)) + _, _, resp, err := util_http.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false)) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } - defer util.CloseResponse(resp) + defer util_http.CloseResponse(resp) tagErr := processMetadata(r.Header, resp.Header, replaceMeta, replaceTagging, s3a.getTags, dir, name) if tagErr != nil { @@ -175,12 +176,12 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req srcUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject)) - resp, dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader) + resp, dataReader, err := util_http.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } - defer util.CloseResponse(resp) + defer util_http.CloseResponse(resp) defer dataReader.Close() glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl) diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 5e46c1459..e0517ffb7 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -20,6 +20,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client" ) type S3ApiServerOption struct { @@ -44,7 +46,7 @@ type S3ApiServer struct { cb *CircuitBreaker randomClientId int32 filerGuard *security.Guard - client *http.Client + client util_http_client.HTTPClientInterface bucketRegistry *BucketRegistry } @@ -84,10 +86,9 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer } s3ApiServer.bucketRegistry = NewBucketRegistry(s3ApiServer) if option.LocalFilerSocket == "" { - s3ApiServer.client = &http.Client{Transport: &http.Transport{ - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - }} + if s3ApiServer.client, err = util_http.NewGlobalHttpClient(); err != nil { + return nil, err + } } else { s3ApiServer.client = &http.Client{ Transport: &http.Transport{ diff --git a/weed/server/common.go b/weed/server/common.go index 7be2f8a76..e6f6cdb88 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -181,7 +181,12 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope PairMap: pu.PairMap, Jwt: assignResult.Auth, } - uploadResult, err := operation.UploadData(pu.Data, uploadOption) + uploader, err := operation.NewUploader() + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + uploadResult, err := uploader.UploadData(pu.Data, uploadOption) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go index e04994569..c1a26ca11 100644 --- a/weed/server/filer_server_handlers_proxy.go +++ b/weed/server/filer_server_handlers_proxy.go @@ -3,24 +3,13 @@ package weed_server import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/security" - "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" "math/rand" "net/http" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) -var ( - client *http.Client -) - -func init() { - client = &http.Client{Transport: &http.Transport{ - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - }} -} - func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) { encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite) @@ -71,14 +60,14 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques } } - proxyResponse, postErr := client.Do(proxyReq) + proxyResponse, postErr := util_http.GetGlobalHttpClient().Do(proxyReq) if postErr != nil { glog.Errorf("post to filer: %v", postErr) w.WriteHeader(http.StatusInternalServerError) return } - defer util.CloseResponse(proxyResponse) + defer util_http.CloseResponse(proxyResponse) for k, v := range proxyResponse.Header { w.Header()[k] = v diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index b186fd34e..ab3988f8c 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -18,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -120,7 +121,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte fs.autoChunk(ctx, w, r, contentLength, so) } - util.CloseRequest(r) + util_http.CloseRequest(r) } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 029fbb7c9..1c7ed0c3c 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -308,8 +308,14 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs PairMap: nil, Jwt: auth, } + + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + return uploaderErr + } + var uploadErr error - uploadResult, uploadErr, _ = operation.Upload(reader, uploadOption) + uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption) if uploadErr != nil { return uploadErr } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 6cf7d65b1..f8d129bf3 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -53,7 +53,13 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht PairMap: pu.PairMap, Jwt: auth, } - uploadResult, uploadError := operation.UploadData(uncompressedData, uploadOption) + + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr) + } + + uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption) if uploadError != nil { return nil, fmt.Errorf("upload to volume server: %v", uploadError) } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 8c8eba078..d0d1575cf 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -158,7 +158,13 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil PairMap: pairMap, Jwt: auth, } - uploadResult, err, data := operation.Upload(limitedReader, uploadOption) + + uploader, err := operation.NewUploader() + if err != nil { + return nil, err, []byte{} + } + + uploadResult, err, data := uploader.Upload(limitedReader, uploadOption) if uploadResult != nil && uploadResult.RetryCount > 0 { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount)) } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 014bdb7f8..65fa622e7 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -30,6 +30,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/topology" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/wdclient" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) const ( @@ -256,7 +257,7 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { } director(req) } - proxy.Transport = util.Transport + proxy.Transport = util_http.GetGlobalHttpClient().GetClientTransport() proxy.ServeHTTP(w, r) } } diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 5e3e42dea..7479b5535 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -18,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/topology" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { @@ -113,11 +114,11 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) location := ms.findVolumeLocation(collection, vid) if location.Error == "" { loc := location.Locations[rand.Intn(len(location.Locations))] - var url string + url, _ := util_http.NormalizeUrl(loc.PublicUrl) if r.URL.RawQuery != "" { - url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery + url = url + r.URL.Path + "?" + r.URL.RawQuery } else { - url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + url = url + r.URL.Path } http.Redirect(w, r, url, http.StatusPermanentRedirect) } else { diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index 64254b3b8..4452e019b 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -70,10 +70,15 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser PairMap: nil, Jwt: security.EncodedJwt(req.Auth), } - if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil { - if err == nil { - err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err) - } + + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil && err == nil { + err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, uploaderErr) + return + } + + if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil { + err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr) } }(replica.Url) } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index ccbd42054..15d639f49 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -27,6 +27,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) @@ -81,7 +82,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } if vs.ReadMode == "proxy" { // proxy client request to target server - u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url)) + rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].Url) + u, _ := url.Parse(rawURL) r.URL.Host = u.Host r.URL.Scheme = u.Scheme request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil) @@ -96,13 +98,13 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - response, err := client.Do(request) + response, err := util_http.GetGlobalHttpClient().Do(request) if err != nil { glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err) InternalError(w) return } - defer util.CloseResponse(response) + defer util_http.CloseResponse(response) // proxy target response to client for k, vv := range response.Header { for _, v := range vv { @@ -116,7 +118,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } else { // redirect - u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) + rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].PublicUrl) + u, _ := url.Parse(rawURL) u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid) arg := url.Values{} if c := r.FormValue("collection"); c != "" { diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 97d51dad7..f8d964552 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -392,8 +392,13 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, } func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + glog.V(0).Infof("upload data %v: %v", f.name, uploaderErr) + return nil, fmt.Errorf("upload data: %v", uploaderErr) + } - fileId, uploadResult, flushErr, _ := operation.UploadWithRetry( + fileId, uploadResult, flushErr, _ := uploader.UploadWithRetry( f.fs, &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go index f6d55c616..b77feb8e3 100644 --- a/weed/shell/command_fs_merge_volumes.go +++ b/weed/shell/command_fs_merge_volumes.go @@ -19,14 +19,10 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/util" -) - -var ( - client *http.Client + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func init() { - client = &http.Client{} Commands = append(Commands, &commandFsMergeVolumes{}) } @@ -104,7 +100,7 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer return nil } - defer client.CloseIdleConnections() + defer util_http.GetGlobalHttpClient().CloseIdleConnections() return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) { @@ -304,7 +300,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie if err != nil { return err } - defer util.CloseResponse(resp) + defer util_http.CloseResponse(resp) defer reader.Close() var filename string @@ -322,7 +318,12 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie isCompressed := resp.Header.Get("Content-Encoding") == "gzip" md5 := resp.Header.Get("Content-MD5") - _, err, _ = operation.Upload(reader, &operation.UploadOption{ + uploader, err := operation.NewUploader() + if err != nil { + return err + } + + _, err, _ = uploader.Upload(reader, &operation.UploadOption{ UploadUrl: uploadURL, Filename: filename, IsInputCompressed: isCompressed, @@ -348,12 +349,12 @@ func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) { } req.Header.Add("Accept-Encoding", "gzip") - r, err := client.Do(req) + r, err := util_http.GetGlobalHttpClient().Do(req) if err != nil { return nil, nil, err } if r.StatusCode >= 400 { - util.CloseResponse(r) + util_http.CloseResponse(r) return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status) } diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go index 2be61f72a..accce60ba 100644 --- a/weed/shell/command_s3_clean_uploads.go +++ b/weed/shell/command_s3_clean_uploads.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func init() { @@ -90,7 +91,7 @@ func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload) fmt.Fprintf(writer, "purge %s\n", deleteUrl) - err = util.Delete(deleteUrl, string(encodedJwt)) + err = util_http.Delete(deleteUrl, string(encodedJwt)) if err != nil && err.Error() != "" { return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err) } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 1d27fae1d..dd58175cf 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -31,6 +31,7 @@ import ( "strings" "sync" "time" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func init() { @@ -552,9 +553,7 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) { fmt.Fprintf(c.writer, "HTTP delete request error: %v\n", err) } - client := &http.Client{} - - resp, err := client.Do(req) + resp, err := util_http.GetGlobalHttpClient().Do(req) if err != nil { fmt.Fprintf(c.writer, "DELETE fetch error: %v\n", err) } diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 82c2db79c..b4a7d649c 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -20,6 +20,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) { @@ -105,7 +106,12 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt BytesBuffer: bytesBuffer, } - _, err := operation.UploadData(n.Data, uploadOption) + uploader, err := operation.NewUploader() + if err != nil { + glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String()) + return err + } + _, err = uploader.UploadData(n.Data, uploadOption) if err != nil { glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String()) } @@ -144,7 +150,7 @@ func ReplicatedDelete(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOp if len(remoteLocations) > 0 { //send to other replica locations if err = DistributedOperation(remoteLocations, func(location operation.Location) error { - return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt)) + return util_http.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt)) }); err != nil { size = 0 } diff --git a/weed/util/http/client/http_client.go b/weed/util/http/client/http_client.go new file mode 100644 index 000000000..d1d2f5c56 --- /dev/null +++ b/weed/util/http/client/http_client.go @@ -0,0 +1,201 @@ +package client + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + util "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/spf13/viper" + "io" + "net/http" + "net/url" + "os" + "strings" + "sync" +) + +var ( + loadSecurityConfigOnce sync.Once +) + +type HTTPClient struct { + Client *http.Client + Transport *http.Transport + expectHttpsScheme bool +} + +func (httpClient *HTTPClient) Do(req *http.Request) (*http.Response, error) { + req.URL.Scheme = httpClient.GetHttpScheme() + return httpClient.Client.Do(req) +} + +func (httpClient *HTTPClient) Get(url string) (resp *http.Response, err error) { + url, err = httpClient.NormalizeHttpScheme(url) + if err != nil { + return nil, err + } + return httpClient.Client.Get(url) +} + +func (httpClient *HTTPClient) Post(url, contentType string, body io.Reader) (resp *http.Response, err error) { + url, err = httpClient.NormalizeHttpScheme(url) + if err != nil { + return nil, err + } + return httpClient.Client.Post(url, contentType, body) +} + +func (httpClient *HTTPClient) PostForm(url string, data url.Values) (resp *http.Response, err error) { + url, err = httpClient.NormalizeHttpScheme(url) + if err != nil { + return nil, err + } + return httpClient.Client.PostForm(url, data) +} + +func (httpClient *HTTPClient) Head(url string) (resp *http.Response, err error) { + url, err = httpClient.NormalizeHttpScheme(url) + if err != nil { + return nil, err + } + return httpClient.Client.Head(url) +} +func (httpClient *HTTPClient) CloseIdleConnections() { + httpClient.Client.CloseIdleConnections() +} + +func (httpClient *HTTPClient) GetClientTransport() *http.Transport { + return httpClient.Transport +} + +func (httpClient *HTTPClient) GetHttpScheme() string { + if httpClient.expectHttpsScheme { + return "https" + } + return "http" +} + +func (httpClient *HTTPClient) NormalizeHttpScheme(rawURL string) (string, error) { + expectedScheme := httpClient.GetHttpScheme() + + if !(strings.HasPrefix(rawURL, "http://") || strings.HasPrefix(rawURL, "https://")) { + return expectedScheme + "://" + rawURL, nil + } + + parsedURL, err := url.Parse(rawURL) + if err != nil { + return "", err + } + + if expectedScheme != parsedURL.Scheme { + parsedURL.Scheme = expectedScheme + } + return parsedURL.String(), nil +} + +func NewHttpClient(clientName ClientName, opts ...HttpClientOpt) (*HTTPClient, error) { + httpClient := HTTPClient{} + httpClient.expectHttpsScheme = checkIsHttpsClientEnabled(clientName) + var tlsConfig *tls.Config = nil + + if httpClient.expectHttpsScheme { + clientCertPair, err := getClientCertPair(clientName) + if err != nil { + return nil, err + } + + clientCaCert, clientCaCertName, err := getClientCaCert(clientName) + if err != nil { + return nil, err + } + + if clientCertPair != nil || len(clientCaCert) != 0 { + caCertPool, err := createHTTPClientCertPool(clientCaCert, clientCaCertName) + if err != nil { + return nil, err + } + + tlsConfig = &tls.Config{ + Certificates: []tls.Certificate{}, + RootCAs: caCertPool, + InsecureSkipVerify: false, + } + + if clientCertPair != nil { + tlsConfig.Certificates = append(tlsConfig.Certificates, *clientCertPair) + } + } + } + + httpClient.Transport = &http.Transport{ + MaxIdleConns: 1024, + MaxIdleConnsPerHost: 1024, + TLSClientConfig: tlsConfig, + } + httpClient.Client = &http.Client{ + Transport: httpClient.Transport, + } + + for _, opt := range opts { + opt(&httpClient) + } + return &httpClient, nil +} + +func getStringOptionFromSecurityConfiguration(clientName ClientName, stringOptionName string) string { + util.LoadSecurityConfiguration() + return viper.GetString(fmt.Sprintf("https.%s.%s", clientName.LowerCaseString(), stringOptionName)) +} + +func getBoolOptionFromSecurityConfiguration(clientName ClientName, boolOptionName string) bool { + util.LoadSecurityConfiguration() + return viper.GetBool(fmt.Sprintf("https.%s.%s", clientName.LowerCaseString(), boolOptionName)) +} + +func checkIsHttpsClientEnabled(clientName ClientName) bool { + return getBoolOptionFromSecurityConfiguration(clientName, "enabled") +} + +func getFileContentFromSecurityConfiguration(clientName ClientName, fileType string) ([]byte, string, error) { + if fileName := getStringOptionFromSecurityConfiguration(clientName, fileType); fileName != "" { + fileContent, err := os.ReadFile(fileName) + if err != nil { + return nil, fileName, err + } + return fileContent, fileName, err + } + return nil, "", nil +} + +func getClientCertPair(clientName ClientName) (*tls.Certificate, error) { + certFileName := getStringOptionFromSecurityConfiguration(clientName, "cert") + keyFileName := getStringOptionFromSecurityConfiguration(clientName, "key") + if certFileName == "" && keyFileName == "" { + return nil, nil + } + if certFileName != "" && keyFileName != "" { + clientCert, err := tls.LoadX509KeyPair(certFileName, keyFileName) + if err != nil { + return nil, fmt.Errorf("error loading client certificate and key: %s", err) + } + return &clientCert, nil + } + return nil, fmt.Errorf("error loading key pair: key `%s` and certificate `%s`", keyFileName, certFileName) +} + +func getClientCaCert(clientName ClientName) ([]byte, string, error) { + return getFileContentFromSecurityConfiguration(clientName, "ca") +} + +func createHTTPClientCertPool(certContent []byte, fileName string) (*x509.CertPool, error) { + certPool := x509.NewCertPool() + if len(certContent) == 0 { + return certPool, nil + } + + ok := certPool.AppendCertsFromPEM(certContent) + if !ok { + return nil, fmt.Errorf("error processing certificate in %s", fileName) + } + return certPool, nil +} diff --git a/weed/util/http/client/http_client_interface.go b/weed/util/http/client/http_client_interface.go new file mode 100644 index 000000000..7a2d43360 --- /dev/null +++ b/weed/util/http/client/http_client_interface.go @@ -0,0 +1,16 @@ +package client + +import ( + "io" + "net/http" + "net/url" +) + +type HTTPClientInterface interface { + Do(req *http.Request) (*http.Response, error) + Get(url string) (resp *http.Response, err error) + Post(url, contentType string, body io.Reader) (resp *http.Response, err error) + PostForm(url string, data url.Values) (resp *http.Response, err error) + Head(url string) (resp *http.Response, err error) + CloseIdleConnections() +} diff --git a/weed/util/http/client/http_client_name.go b/weed/util/http/client/http_client_name.go new file mode 100644 index 000000000..aedaebbc6 --- /dev/null +++ b/weed/util/http/client/http_client_name.go @@ -0,0 +1,14 @@ +package client + +import "strings" + +type ClientName int + +//go:generate stringer -type=ClientName -output=http_client_name_string.go +const ( + Client ClientName = iota +) + +func (name *ClientName) LowerCaseString() string { + return strings.ToLower(name.String()) +} diff --git a/weed/util/http/client/http_client_name_string.go b/weed/util/http/client/http_client_name_string.go new file mode 100644 index 000000000..652fcdaac --- /dev/null +++ b/weed/util/http/client/http_client_name_string.go @@ -0,0 +1,23 @@ +// Code generated by "stringer -type=ClientName -output=http_client_name_string.go"; DO NOT EDIT. + +package client + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[Client-0] +} + +const _ClientName_name = "Client" + +var _ClientName_index = [...]uint8{0, 6} + +func (i ClientName) String() string { + if i < 0 || i >= ClientName(len(_ClientName_index)-1) { + return "ClientName(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _ClientName_name[_ClientName_index[i]:_ClientName_index[i+1]] +} diff --git a/weed/util/http/client/http_client_opt.go b/weed/util/http/client/http_client_opt.go new file mode 100644 index 000000000..1ff9d533d --- /dev/null +++ b/weed/util/http/client/http_client_opt.go @@ -0,0 +1,18 @@ +package client + +import ( + "net" + "time" +) + +type HttpClientOpt = func(clientCfg *HTTPClient) + +func AddDialContext(httpClient *HTTPClient) { + dialContext := (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 10 * time.Second, + }).DialContext + + httpClient.Transport.DialContext = dialContext + httpClient.Client.Transport = httpClient.Transport +} diff --git a/weed/util/http/http_global_client_init.go b/weed/util/http/http_global_client_init.go new file mode 100644 index 000000000..0dcb05cfd --- /dev/null +++ b/weed/util/http/http_global_client_init.go @@ -0,0 +1,27 @@ +package http + +import ( + "github.com/seaweedfs/seaweedfs/weed/glog" + util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client" +) + +var ( + globalHttpClient *util_http_client.HTTPClient +) + +func NewGlobalHttpClient(opt ...util_http_client.HttpClientOpt) (*util_http_client.HTTPClient, error) { + return util_http_client.NewHttpClient(util_http_client.Client, opt...) +} + +func GetGlobalHttpClient() *util_http_client.HTTPClient { + return globalHttpClient +} + +func InitGlobalHttpClient() { + var err error + + globalHttpClient, err = NewGlobalHttpClient() + if err != nil { + glog.Fatalf("error init global http client: %v", err) + } +} diff --git a/weed/util/http_util.go b/weed/util/http/http_global_client_util.go similarity index 91% rename from weed/util/http_util.go rename to weed/util/http/http_global_client_util.go index 837b3ccb6..c3931a790 100644 --- a/weed/util/http_util.go +++ b/weed/util/http/http_global_client_util.go @@ -1,4 +1,4 @@ -package util +package http import ( "compress/gzip" @@ -6,6 +6,7 @@ import ( "errors" "fmt" "github.com/seaweedfs/seaweedfs/weed/util/mem" + "github.com/seaweedfs/seaweedfs/weed/util" "io" "net/http" "net/url" @@ -15,23 +16,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" ) -var ( - client *http.Client - Transport *http.Transport -) - -func init() { - Transport = &http.Transport{ - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - } - client = &http.Client{ - Transport: Transport, - } -} - func Post(url string, values url.Values) ([]byte, error) { - r, err := client.PostForm(url, values) + r, err := GetGlobalHttpClient().PostForm(url, values) if err != nil { return nil, err } @@ -64,7 +50,7 @@ func GetAuthenticated(url, jwt string) ([]byte, bool, error) { maybeAddAuth(request, jwt) request.Header.Add("Accept-Encoding", "gzip") - response, err := client.Do(request) + response, err := GetGlobalHttpClient().Do(request) if err != nil { return nil, true, err } @@ -94,7 +80,7 @@ func GetAuthenticated(url, jwt string) ([]byte, bool, error) { } func Head(url string) (http.Header, error) { - r, err := client.Head(url) + r, err := GetGlobalHttpClient().Head(url) if err != nil { return nil, err } @@ -117,7 +103,7 @@ func Delete(url string, jwt string) error { if err != nil { return err } - resp, e := client.Do(req) + resp, e := GetGlobalHttpClient().Do(req) if e != nil { return e } @@ -145,7 +131,7 @@ func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err err if err != nil { return } - resp, err := client.Do(req) + resp, err := GetGlobalHttpClient().Do(req) if err != nil { return } @@ -159,7 +145,7 @@ func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err err } func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error { - r, err := client.PostForm(url, values) + r, err := GetGlobalHttpClient().PostForm(url, values) if err != nil { return err } @@ -182,7 +168,7 @@ func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachB } func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error { - r, err := client.PostForm(url, values) + r, err := GetGlobalHttpClient().PostForm(url, values) if err != nil { return err } @@ -201,7 +187,7 @@ func DownloadFile(fileUrl string, jwt string) (filename string, header http.Head maybeAddAuth(req, jwt) - response, err := client.Do(req) + response, err := GetGlobalHttpClient().Do(req) if err != nil { return "", nil, nil, err } @@ -219,14 +205,11 @@ func DownloadFile(fileUrl string, jwt string) (filename string, header http.Head } func Do(req *http.Request) (resp *http.Response, err error) { - return client.Do(req) + return GetGlobalHttpClient().Do(req) } -func NormalizeUrl(url string) string { - if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") { - return url - } - return "http://" + url +func NormalizeUrl(url string) (string, error) { + return GetGlobalHttpClient().NormalizeHttpScheme(url) } func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { @@ -249,7 +232,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC req.Header.Set("Accept-Encoding", "gzip") } - r, err := client.Do(req) + r, err := GetGlobalHttpClient().Do(req) if err != nil { return 0, err } @@ -322,7 +305,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } - r, err := client.Do(req) + r, err := GetGlobalHttpClient().Do(req) if err != nil { return true, err } @@ -368,12 +351,12 @@ func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed if err != nil { return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) } - decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey)) + decryptedData, err := util.Decrypt(encryptedData, util.CipherKey(cipherKey)) if err != nil { return false, fmt.Errorf("decrypt %s: %v", fileUrl, err) } if isContentCompressed { - decryptedData, err = DecompressData(decryptedData) + decryptedData, err = util.DecompressData(decryptedData) if err != nil { glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err) } @@ -403,7 +386,7 @@ func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*htt maybeAddAuth(req, jwt) - r, err := client.Do(req) + r, err := GetGlobalHttpClient().Do(req) if err != nil { return nil, nil, err } @@ -463,7 +446,7 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, var shouldRetry bool - for waitTime := time.Second; waitTime < RetryWaitTime; waitTime += waitTime / 2 { + for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for _, urlString := range urlStrings { n = 0 if strings.Contains(urlString, "%") { @@ -494,4 +477,4 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, return n, err -} +} \ No newline at end of file diff --git a/weed/weed.go b/weed/weed.go index a821cd72f..5139dd39c 100644 --- a/weed/weed.go +++ b/weed/weed.go @@ -20,6 +20,7 @@ import ( "github.com/getsentry/sentry-go" "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/glog" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var IsDebug *bool @@ -86,6 +87,7 @@ func main() { return } + util_http.InitGlobalHttpClient() for _, cmd := range commands { if cmd.Name() == args[0] && cmd.Run != nil { cmd.Flag.Usage = func() { cmd.Usage() }