filer.copy: retryable file part upload

This commit is contained in:
chrislu 2022-08-20 18:59:57 -07:00
parent a3553da7f7
commit 6c8822f269

View File

@ -454,58 +454,41 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
wg.Done()
<-concurrentChunks
}()
// assign a volume
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
err := util.Retry("assignVolume", func() error {
return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
DiskType: *worker.options.diskType,
Path: task.destinationUrlPath + fileName,
}
assignResult, assignError = client.AssignVolume(context.Background(), request)
if assignError != nil {
return fmt.Errorf("assign volume failure %v: %v", request, assignError)
}
if assignResult.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
}
return nil
})
})
fileId, uploadResult, err, _ := operation.UploadWithRetry(
worker,
&filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
DiskType: *worker.options.diskType,
Path: task.destinationUrlPath + fileName,
},
&operation.UploadOption{
Filename: fileName + "-" + strconv.FormatInt(i+1, 10),
Cipher: worker.options.cipher,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
},
func(host, fileId string) string {
return fmt.Sprintf("http://%s/%s", host, fileId)
},
io.NewSectionReader(f, i*chunkSize, chunkSize),
)
if err != nil {
uploadError = fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err)
return
}
targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
uploadOption := &operation.UploadOption{
UploadUrl: targetUrl,
Filename: fileName + "-" + strconv.FormatInt(i+1, 10),
Cipher: worker.options.cipher,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
Jwt: security.EncodedJwt(assignResult.Auth),
}
uploadResult, err, _ := operation.Upload(io.NewSectionReader(f, i*chunkSize, chunkSize), uploadOption)
if err != nil {
uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
uploadError = fmt.Errorf("upload data %v: %v\n", fileName, err)
return
}
if uploadResult.Error != "" {
uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
uploadError = fmt.Errorf("upload %v result: %v\n", fileName, uploadResult.Error)
return
}
chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i*chunkSize)
chunksChan <- uploadResult.ToPbFileChunk(fileId, i*chunkSize)
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
fmt.Printf("uploaded %s-%d [%d,%d)\n", fileName, i+1, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}(i)
}
wg.Wait()