2018-05-28 14:53:10 +08:00
package weed_server
import (
2019-03-16 06:55:34 +08:00
"context"
2018-05-28 14:53:10 +08:00
"io"
"net/http"
"path"
"strconv"
2018-07-22 08:47:59 +08:00
"strings"
2018-05-28 14:59:49 +08:00
"time"
2018-05-28 14:53:10 +08:00
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2019-02-15 16:09:19 +08:00
"github.com/chrislusf/seaweedfs/weed/security"
2019-06-23 13:53:52 +08:00
"github.com/chrislusf/seaweedfs/weed/stats"
2018-06-11 07:57:32 +08:00
"github.com/chrislusf/seaweedfs/weed/util"
2018-05-28 14:53:10 +08:00
)
2019-03-18 15:35:15 +08:00
func ( fs * FilerServer ) autoChunk ( ctx context . Context , w http . ResponseWriter , r * http . Request ,
replication string , collection string , dataCenter string ) bool {
2018-05-28 14:53:10 +08:00
if r . Method != "POST" {
glog . V ( 4 ) . Infoln ( "AutoChunking not supported for method" , r . Method )
return false
}
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
query := r . URL . Query ( )
parsedMaxMB , _ := strconv . ParseInt ( query . Get ( "maxMB" ) , 10 , 32 )
maxMB := int32 ( parsedMaxMB )
2018-07-07 17:18:47 +08:00
if maxMB <= 0 && fs . option . MaxMB > 0 {
maxMB = int32 ( fs . option . MaxMB )
2018-05-28 14:53:10 +08:00
}
if maxMB <= 0 {
glog . V ( 4 ) . Infoln ( "AutoChunking not enabled" )
return false
}
glog . V ( 4 ) . Infoln ( "AutoChunking level set to" , maxMB , "(MB)" )
chunkSize := 1024 * 1024 * maxMB
contentLength := int64 ( 0 )
if contentLengthHeader := r . Header [ "Content-Length" ] ; len ( contentLengthHeader ) == 1 {
contentLength , _ = strconv . ParseInt ( contentLengthHeader [ 0 ] , 10 , 64 )
if contentLength <= int64 ( chunkSize ) {
glog . V ( 4 ) . Infoln ( "Content-Length of" , contentLength , "is less than the chunk size of" , chunkSize , "so autoChunking will be skipped." )
return false
}
}
if contentLength <= 0 {
glog . V ( 4 ) . Infoln ( "Content-Length value is missing or unexpected so autoChunking will be skipped." )
return false
}
2019-03-16 06:55:34 +08:00
reply , err := fs . doAutoChunk ( ctx , w , r , contentLength , chunkSize , replication , collection , dataCenter )
2018-05-28 14:53:10 +08:00
if err != nil {
writeJsonError ( w , r , http . StatusInternalServerError , err )
} else if reply != nil {
writeJsonQuiet ( w , r , http . StatusCreated , reply )
}
return true
}
2019-03-18 15:35:15 +08:00
func ( fs * FilerServer ) doAutoChunk ( ctx context . Context , w http . ResponseWriter , r * http . Request ,
contentLength int64 , chunkSize int32 , replication string , collection string , dataCenter string ) ( filerResult * FilerPostResult , replyerr error ) {
2018-05-28 14:53:10 +08:00
2019-06-23 13:53:52 +08:00
stats . FilerRequestCounter . WithLabelValues ( "postAutoChunk" ) . Inc ( )
start := time . Now ( )
2019-06-23 16:57:51 +08:00
defer func ( ) {
stats . FilerRequestHistogram . WithLabelValues ( "postAutoChunk" ) . Observe ( time . Since ( start ) . Seconds ( ) )
} ( )
2019-06-23 13:53:52 +08:00
2018-05-28 14:53:10 +08:00
multipartReader , multipartReaderErr := r . MultipartReader ( )
if multipartReaderErr != nil {
return nil , multipartReaderErr
}
part1 , part1Err := multipartReader . NextPart ( )
if part1Err != nil {
return nil , part1Err
}
fileName := part1 . FileName ( )
if fileName != "" {
fileName = path . Base ( fileName )
}
var fileChunks [ ] * filer_pb . FileChunk
chunkOffset := int64 ( 0 )
2020-02-04 09:04:06 +08:00
for chunkOffset < contentLength {
limitedReader := io . LimitReader ( part1 , int64 ( chunkSize ) )
2018-05-28 14:53:10 +08:00
2020-02-04 09:04:06 +08:00
// assign one file id for one chunk
fileId , urlLocation , auth , assignErr := fs . assignNewFileInfo ( w , r , replication , collection , dataCenter )
if assignErr != nil {
return nil , assignErr
2018-05-28 14:53:10 +08:00
}
2020-02-04 09:04:06 +08:00
// upload the chunk to the volume server
chunkName := fileName + "_chunk_" + strconv . FormatInt ( int64 ( len ( fileChunks ) + 1 ) , 10 )
2020-03-07 22:06:58 +08:00
uploadResult , uploadErr := fs . doUpload ( urlLocation , w , r , limitedReader , chunkName , "" , fileId , auth )
2020-02-04 09:04:06 +08:00
if uploadErr != nil {
return nil , uploadErr
}
2018-05-28 14:53:10 +08:00
2020-02-04 09:04:06 +08:00
// if last chunk exhausted the reader exactly at the border
2020-03-07 22:06:58 +08:00
if uploadResult . Size == 0 {
2018-05-28 14:53:10 +08:00
break
}
2020-02-04 09:04:06 +08:00
// Save to chunk manifest structure
fileChunks = append ( fileChunks ,
& filer_pb . FileChunk {
2020-03-07 22:06:58 +08:00
FileId : fileId ,
Offset : chunkOffset ,
Size : uint64 ( uploadResult . Size ) ,
Mtime : time . Now ( ) . UnixNano ( ) ,
ETag : uploadResult . ETag ,
CipherKey : uploadResult . CipherKey ,
2020-02-04 09:04:06 +08:00
} ,
)
2020-03-07 22:06:58 +08:00
glog . V ( 4 ) . Infof ( "uploaded %s chunk %d to %s [%d,%d) of %d" , fileName , len ( fileChunks ) , fileId , chunkOffset , chunkOffset + int64 ( uploadResult . Size ) , contentLength )
2020-02-04 09:04:06 +08:00
2020-03-03 12:27:14 +08:00
// reset variables for the next chunk
2020-03-07 22:06:58 +08:00
chunkOffset = chunkOffset + int64 ( uploadResult . Size )
2020-03-03 12:27:14 +08:00
2020-02-04 09:04:06 +08:00
// if last chunk was not at full chunk size, but already exhausted the reader
2020-03-07 22:06:58 +08:00
if int64 ( uploadResult . Size ) < int64 ( chunkSize ) {
2020-02-04 09:04:06 +08:00
break
2018-05-28 14:53:10 +08:00
}
}
path := r . URL . Path
2018-07-22 08:47:59 +08:00
if strings . HasSuffix ( path , "/" ) {
if fileName != "" {
path += fileName
2018-05-28 14:53:10 +08:00
}
}
glog . V ( 4 ) . Infoln ( "saving" , path )
entry := & filer2 . Entry {
FullPath : filer2 . FullPath ( path ) ,
Attr : filer2 . Attr {
2018-06-11 07:57:32 +08:00
Mtime : time . Now ( ) ,
Crtime : time . Now ( ) ,
Mode : 0660 ,
2018-07-22 08:47:59 +08:00
Uid : OS_UID ,
Gid : OS_GID ,
2018-06-11 07:57:32 +08:00
Replication : replication ,
Collection : collection ,
TtlSec : int32 ( util . ParseInt ( r . URL . Query ( ) . Get ( "ttl" ) , 0 ) ) ,
2018-05-28 14:53:10 +08:00
} ,
Chunks : fileChunks ,
}
2020-02-04 09:04:06 +08:00
filerResult = & FilerPostResult {
Name : fileName ,
Size : chunkOffset ,
}
2020-01-23 03:42:40 +08:00
if dbErr := fs . filer . CreateEntry ( ctx , entry , false ) ; dbErr != nil {
2019-12-13 16:23:05 +08:00
fs . filer . DeleteChunks ( entry . Chunks )
2019-06-23 13:53:52 +08:00
replyerr = dbErr
filerResult . Error = dbErr . Error ( )
glog . V ( 0 ) . Infof ( "failing to write %s to filer server : %v" , path , dbErr )
2018-05-28 14:53:10 +08:00
return
}
return
}
2019-03-18 15:35:15 +08:00
func ( fs * FilerServer ) doUpload ( urlLocation string , w http . ResponseWriter , r * http . Request ,
2020-03-07 22:06:58 +08:00
limitedReader io . Reader , fileName string , contentType string , fileId string , auth security . EncodedJwt ) ( * operation . UploadResult , error ) {
2018-05-28 14:53:10 +08:00
2019-06-23 13:53:52 +08:00
stats . FilerRequestCounter . WithLabelValues ( "postAutoChunkUpload" ) . Inc ( )
start := time . Now ( )
2019-06-23 16:57:51 +08:00
defer func ( ) {
stats . FilerRequestHistogram . WithLabelValues ( "postAutoChunkUpload" ) . Observe ( time . Since ( start ) . Seconds ( ) )
} ( )
2019-06-23 13:53:52 +08:00
2020-03-07 22:06:58 +08:00
return operation . Upload ( urlLocation , fileName , fs . option . Cipher , limitedReader , false , contentType , nil , auth )
2018-05-28 14:53:10 +08:00
}